Commit b5dd50f2 authored by Tom Lane's avatar Tom Lane

Rewrite async-connection loop in libpqwalreceiver.c, once again.

The original coding in commit 1e8a8500 didn't use PQconnectPoll per
spec, and while the rewrite in e434ad39 is closer, it still doesn't
guarantee to wait until the socket is read-ready or write-ready (as
appropriate) before calling PQconnectPoll.  It's not clear whether
that omission is causing the continuing failures on buildfarm member
bowerbird; but given the lack of other explanations meeting the
available facts, let's tighten that up and see what happens.

An independent issue in the same loop was that it had a race condition
whereby it could clear the process's latch without having serviced an
interrupt request, causing failure to respond to a cancel while waiting
for connection (the very problem 1e8a8500 was meant to fix).

Discussion: https://postgr.es/m/7295.1489596949@sss.pgh.pa.us
parent 1ea60ad6
...@@ -159,41 +159,41 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, ...@@ -159,41 +159,41 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
/* /*
* Poll connection until we have OK or FAILED status. * Poll connection until we have OK or FAILED status.
* *
* Note that the initial state after PQconnectStartParams is * Per spec for PQconnectPoll, first wait till socket is write-ready.
* PGRES_POLLING_WRITING.
*/ */
for (status = PGRES_POLLING_WRITING; status = PGRES_POLLING_WRITING;
status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED; do
status = PQconnectPoll(conn->streamConn))
{ {
/* Sleep a bit if waiting for socket. */ /* Wait for socket ready and/or other events. */
if (status == PGRES_POLLING_READING || int io_flag;
status == PGRES_POLLING_WRITING)
{
int extra_flag;
int rc; int rc;
extra_flag = (status == PGRES_POLLING_READING io_flag = (status == PGRES_POLLING_READING
? WL_SOCKET_READABLE ? WL_SOCKET_READABLE
: WL_SOCKET_WRITEABLE); : WL_SOCKET_WRITEABLE);
ResetLatch(&MyProc->procLatch);
rc = WaitLatchOrSocket(&MyProc->procLatch, rc = WaitLatchOrSocket(&MyProc->procLatch,
WL_POSTMASTER_DEATH | WL_POSTMASTER_DEATH |
WL_LATCH_SET | extra_flag, WL_LATCH_SET | io_flag,
PQsocket(conn->streamConn), PQsocket(conn->streamConn),
0, 0,
WAIT_EVENT_LIBPQWALRECEIVER); WAIT_EVENT_LIBPQWALRECEIVER);
/* Emergency bailout. */ /* Emergency bailout? */
if (rc & WL_POSTMASTER_DEATH) if (rc & WL_POSTMASTER_DEATH)
exit(1); exit(1);
/* Interrupted. */ /* Interrupted? */
if (rc & WL_LATCH_SET) if (rc & WL_LATCH_SET)
{
ResetLatch(&MyProc->procLatch);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
} }
}
/* If socket is ready, advance the libpq state machine */
if (rc & io_flag)
status = PQconnectPoll(conn->streamConn);
} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
if (PQstatus(conn->streamConn) != CONNECTION_OK) if (PQstatus(conn->streamConn) != CONNECTION_OK)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment