Commit c4059188 authored by Tom Lane's avatar Tom Lane

Fix unwanted flushing of libpq's input buffer when socket EOF is seen.

In commit 210eb9b7 I centralized libpq's logic for closing down
the backend communication socket, and made the new pqDropConnection
routine always reset the I/O buffers to empty.  Many of the call sites
previously had not had such code, and while that amounted to an oversight
in some cases, there was one place where it was intentional and necessary
*not* to flush the input buffer: pqReadData should never cause that to
happen, since we probably still want to process whatever data we read.

This is the true cause of the problem Robert was attempting to fix in
c3e7c24a, namely that libpq no longer reported the backend's final
ERROR message before reporting "server closed the connection unexpectedly".
But that only accidentally fixed it, by invoking parseInput before the
input buffer got flushed; and very likely there are timing scenarios
where we'd still lose the message before processing it.

To fix, pass a flag to pqDropConnection to tell it whether to flush the
input buffer or not.  On review I think flushing is actually correct for
every other call site.

Back-patch to 9.3 where the problem was introduced.  In HEAD, also improve
the comments added by c3e7c24a.
parent c3e7c24a
...@@ -391,9 +391,13 @@ pgthreadlock_t pg_g_threadlock = default_threadlock; ...@@ -391,9 +391,13 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
* Close any physical connection to the server, and reset associated * Close any physical connection to the server, and reset associated
* state inside the connection object. We don't release state that * state inside the connection object. We don't release state that
* would be needed to reconnect, though. * would be needed to reconnect, though.
*
* We can always flush the output buffer, since there's no longer any hope
* of sending that data. However, unprocessed input data might still be
* valuable, so the caller must tell us whether to flush that or not.
*/ */
void void
pqDropConnection(PGconn *conn) pqDropConnection(PGconn *conn, bool flushInput)
{ {
/* Drop any SSL state */ /* Drop any SSL state */
pqsecure_close(conn); pqsecure_close(conn);
...@@ -401,8 +405,10 @@ pqDropConnection(PGconn *conn) ...@@ -401,8 +405,10 @@ pqDropConnection(PGconn *conn)
if (conn->sock != PGINVALID_SOCKET) if (conn->sock != PGINVALID_SOCKET)
closesocket(conn->sock); closesocket(conn->sock);
conn->sock = PGINVALID_SOCKET; conn->sock = PGINVALID_SOCKET;
/* Discard any unread/unsent data */ /* Optionally discard any unread data */
if (flushInput)
conn->inStart = conn->inCursor = conn->inEnd = 0; conn->inStart = conn->inCursor = conn->inEnd = 0;
/* Always discard any unsent data */
conn->outCount = 0; conn->outCount = 0;
} }
...@@ -1510,7 +1516,7 @@ connectDBStart(PGconn *conn) ...@@ -1510,7 +1516,7 @@ connectDBStart(PGconn *conn)
return 1; return 1;
connect_errReturn: connect_errReturn:
pqDropConnection(conn); pqDropConnection(conn, true);
conn->status = CONNECTION_BAD; conn->status = CONNECTION_BAD;
return 0; return 0;
} }
...@@ -1732,7 +1738,7 @@ keep_going: /* We will come back to here until there is ...@@ -1732,7 +1738,7 @@ keep_going: /* We will come back to here until there is
{ {
if (!connectNoDelay(conn)) if (!connectNoDelay(conn))
{ {
pqDropConnection(conn); pqDropConnection(conn, true);
conn->addr_cur = addr_cur->ai_next; conn->addr_cur = addr_cur->ai_next;
continue; continue;
} }
...@@ -1742,7 +1748,7 @@ keep_going: /* We will come back to here until there is ...@@ -1742,7 +1748,7 @@ keep_going: /* We will come back to here until there is
appendPQExpBuffer(&conn->errorMessage, appendPQExpBuffer(&conn->errorMessage,
libpq_gettext("could not set socket to nonblocking mode: %s\n"), libpq_gettext("could not set socket to nonblocking mode: %s\n"),
SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf))); SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf)));
pqDropConnection(conn); pqDropConnection(conn, true);
conn->addr_cur = addr_cur->ai_next; conn->addr_cur = addr_cur->ai_next;
continue; continue;
} }
...@@ -1753,7 +1759,7 @@ keep_going: /* We will come back to here until there is ...@@ -1753,7 +1759,7 @@ keep_going: /* We will come back to here until there is
appendPQExpBuffer(&conn->errorMessage, appendPQExpBuffer(&conn->errorMessage,
libpq_gettext("could not set socket to close-on-exec mode: %s\n"), libpq_gettext("could not set socket to close-on-exec mode: %s\n"),
SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf))); SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf)));
pqDropConnection(conn); pqDropConnection(conn, true);
conn->addr_cur = addr_cur->ai_next; conn->addr_cur = addr_cur->ai_next;
continue; continue;
} }
...@@ -1800,7 +1806,7 @@ keep_going: /* We will come back to here until there is ...@@ -1800,7 +1806,7 @@ keep_going: /* We will come back to here until there is
if (err) if (err)
{ {
pqDropConnection(conn); pqDropConnection(conn, true);
conn->addr_cur = addr_cur->ai_next; conn->addr_cur = addr_cur->ai_next;
continue; continue;
} }
...@@ -1887,7 +1893,7 @@ keep_going: /* We will come back to here until there is ...@@ -1887,7 +1893,7 @@ keep_going: /* We will come back to here until there is
* failure and keep going if there are more addresses. * failure and keep going if there are more addresses.
*/ */
connectFailureMessage(conn, SOCK_ERRNO); connectFailureMessage(conn, SOCK_ERRNO);
pqDropConnection(conn); pqDropConnection(conn, true);
/* /*
* Try the next address, if any. * Try the next address, if any.
...@@ -1932,7 +1938,7 @@ keep_going: /* We will come back to here until there is ...@@ -1932,7 +1938,7 @@ keep_going: /* We will come back to here until there is
* error message. * error message.
*/ */
connectFailureMessage(conn, optval); connectFailureMessage(conn, optval);
pqDropConnection(conn); pqDropConnection(conn, true);
/* /*
* If more addresses remain, keep trying, just as in the * If more addresses remain, keep trying, just as in the
...@@ -2220,7 +2226,7 @@ keep_going: /* We will come back to here until there is ...@@ -2220,7 +2226,7 @@ keep_going: /* We will come back to here until there is
/* only retry once */ /* only retry once */
conn->allow_ssl_try = false; conn->allow_ssl_try = false;
/* Must drop the old connection */ /* Must drop the old connection */
pqDropConnection(conn); pqDropConnection(conn, true);
conn->status = CONNECTION_NEEDED; conn->status = CONNECTION_NEEDED;
goto keep_going; goto keep_going;
} }
...@@ -2331,7 +2337,7 @@ keep_going: /* We will come back to here until there is ...@@ -2331,7 +2337,7 @@ keep_going: /* We will come back to here until there is
{ {
conn->pversion = PG_PROTOCOL(2, 0); conn->pversion = PG_PROTOCOL(2, 0);
/* Must drop the old connection */ /* Must drop the old connection */
pqDropConnection(conn); pqDropConnection(conn, true);
conn->status = CONNECTION_NEEDED; conn->status = CONNECTION_NEEDED;
goto keep_going; goto keep_going;
} }
...@@ -2397,7 +2403,7 @@ keep_going: /* We will come back to here until there is ...@@ -2397,7 +2403,7 @@ keep_going: /* We will come back to here until there is
/* only retry once */ /* only retry once */
conn->wait_ssl_try = false; conn->wait_ssl_try = false;
/* Must drop the old connection */ /* Must drop the old connection */
pqDropConnection(conn); pqDropConnection(conn, true);
conn->status = CONNECTION_NEEDED; conn->status = CONNECTION_NEEDED;
goto keep_going; goto keep_going;
} }
...@@ -2413,7 +2419,7 @@ keep_going: /* We will come back to here until there is ...@@ -2413,7 +2419,7 @@ keep_going: /* We will come back to here until there is
/* only retry once */ /* only retry once */
conn->allow_ssl_try = false; conn->allow_ssl_try = false;
/* Must drop the old connection */ /* Must drop the old connection */
pqDropConnection(conn); pqDropConnection(conn, true);
conn->status = CONNECTION_NEEDED; conn->status = CONNECTION_NEEDED;
goto keep_going; goto keep_going;
} }
...@@ -2574,7 +2580,7 @@ keep_going: /* We will come back to here until there is ...@@ -2574,7 +2580,7 @@ keep_going: /* We will come back to here until there is
PQclear(res); PQclear(res);
conn->send_appname = false; conn->send_appname = false;
/* Must drop the old connection */ /* Must drop the old connection */
pqDropConnection(conn); pqDropConnection(conn, true);
conn->status = CONNECTION_NEEDED; conn->status = CONNECTION_NEEDED;
goto keep_going; goto keep_going;
} }
...@@ -2971,7 +2977,7 @@ closePGconn(PGconn *conn) ...@@ -2971,7 +2977,7 @@ closePGconn(PGconn *conn)
/* /*
* Close the connection, reset all transient state, flush I/O buffers. * Close the connection, reset all transient state, flush I/O buffers.
*/ */
pqDropConnection(conn); pqDropConnection(conn, true);
conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just
* absent */ * absent */
conn->asyncStatus = PGASYNC_IDLE; conn->asyncStatus = PGASYNC_IDLE;
......
...@@ -1553,8 +1553,10 @@ sendFailed: ...@@ -1553,8 +1553,10 @@ sendFailed:
/* /*
* pqHandleSendFailure: try to clean up after failure to send command. * pqHandleSendFailure: try to clean up after failure to send command.
* *
* Primarily, what we want to accomplish here is to process any messages that * Primarily, what we want to accomplish here is to process any ERROR or
* the backend might have sent just before it died. * NOTICE messages that the backend might have sent just before it died.
* Since we're in IDLE state, all such messages will get sent to the notice
* processor.
* *
* NOTE: this routine should only be called in PGASYNC_IDLE state. * NOTE: this routine should only be called in PGASYNC_IDLE state.
*/ */
...@@ -1562,16 +1564,17 @@ void ...@@ -1562,16 +1564,17 @@ void
pqHandleSendFailure(PGconn *conn) pqHandleSendFailure(PGconn *conn)
{ {
/* /*
* Accept and parse any available input data. Note that if pqReadData * Accept and parse any available input data, ignoring I/O errors. Note
* decides the backend has closed the channel, it will close our side of * that if pqReadData decides the backend has closed the channel, it will
* the socket --- that's just what we want here. * close our side of the socket --- that's just what we want here.
*/ */
while (pqReadData(conn) > 0) while (pqReadData(conn) > 0)
parseInput(conn); parseInput(conn);
/* /*
* Make one attempt to parse available input messages even if we read no * Be sure to parse available input messages even if we read no data.
* data. * (Note: calling parseInput within the above loop isn't really necessary,
* but it prevents buffer bloat if there's a lot of data available.)
*/ */
parseInput(conn); parseInput(conn);
} }
......
...@@ -815,7 +815,8 @@ definitelyEOF: ...@@ -815,7 +815,8 @@ definitelyEOF:
/* Come here if lower-level code already set a suitable errorMessage */ /* Come here if lower-level code already set a suitable errorMessage */
definitelyFailed: definitelyFailed:
pqDropConnection(conn); /* Do *not* drop any already-read data; caller still wants it */
pqDropConnection(conn, false);
conn->status = CONNECTION_BAD; /* No more connection to backend */ conn->status = CONNECTION_BAD; /* No more connection to backend */
return -1; return -1;
} }
......
...@@ -446,8 +446,8 @@ handleSyncLoss(PGconn *conn, char id, int msgLength) ...@@ -446,8 +446,8 @@ handleSyncLoss(PGconn *conn, char id, int msgLength)
/* build an error result holding the error message */ /* build an error result holding the error message */
pqSaveErrorResult(conn); pqSaveErrorResult(conn);
conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */ conn->asyncStatus = PGASYNC_READY; /* drop out of GetResult wait loop */
/* flush input data since we're giving up on processing it */
pqDropConnection(conn); pqDropConnection(conn, true);
conn->status = CONNECTION_BAD; /* No more connection to backend */ conn->status = CONNECTION_BAD; /* No more connection to backend */
} }
......
...@@ -515,7 +515,7 @@ extern char *const pgresStatus[]; ...@@ -515,7 +515,7 @@ extern char *const pgresStatus[];
/* === in fe-connect.c === */ /* === in fe-connect.c === */
extern void pqDropConnection(PGconn *conn); extern void pqDropConnection(PGconn *conn, bool flushInput);
extern int pqPacketSend(PGconn *conn, char pack_type, extern int pqPacketSend(PGconn *conn, char pack_type,
const void *buf, size_t buf_len); const void *buf, size_t buf_len);
extern bool pqGetHomeDirectory(char *buf, int bufsize); extern bool pqGetHomeDirectory(char *buf, int bufsize);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment