Commit f3a4d7e7 authored by Tom Lane's avatar Tom Lane

Distinguish wait-for-connection from wait-for-write-ready on Windows.

The API for WaitLatch and friends followed the Unix convention in which
waiting for a socket connection to complete is identical to waiting for
the socket to accept a write.  While Windows provides a select(2)
emulation that agrees with that, the native WaitForMultipleObjects API
treats them as quite different --- and for some bizarre reason, it will
report a not-yet-connected socket as write-ready.  libpq itself has so
far escaped dealing with this because it waits with select(), but in
libpqwalreceiver.c we want to wait using WaitLatchOrSocket.  The semantics
mismatch resulted in replication connection failures on Windows, but only
for remote connections (apparently, localhost connections complete
immediately, or at least too fast for anyone to have noticed the problem
in single-machine testing).

To fix, introduce an additional WL_SOCKET_CONNECTED wait flag for
WaitLatchOrSocket, which is identical to WL_SOCKET_WRITEABLE on
non-Windows, but results in waiting for FD_CONNECT events on Windows.

Ideally, we would also distinguish the two conditions in the API for
PQconnectPoll(), but changing that API at this point seems infeasible.
Instead, cheat by checking for PQstatus() == CONNECTION_STARTED to
determine that we're still waiting for the connection to complete.
(This is a cheat mainly because CONNECTION_STARTED is documented as an
internal state rather than something callers should rely on.  Perhaps
we ought to change the documentation ... but this patch doesn't.)

Per reports from Jobin Augustine and Igor Neyman.  Back-patch to v10
where commit 1e8a8500 exposed this longstanding shortcoming.

Andres Freund, minor fix and some code review/beautification by me

Discussion: https://postgr.es/m/CAHBggj8g2T+ZDcACZ2FmzX9CTxkWjKBsHd6NkYB4i9Ojf6K1Fw@mail.gmail.com
parent 480f1f43
...@@ -168,13 +168,18 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, ...@@ -168,13 +168,18 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
status = PGRES_POLLING_WRITING; status = PGRES_POLLING_WRITING;
do do
{ {
/* Wait for socket ready and/or other events. */
int io_flag; int io_flag;
int rc; int rc;
io_flag = (status == PGRES_POLLING_READING if (status == PGRES_POLLING_READING)
? WL_SOCKET_READABLE io_flag = WL_SOCKET_READABLE;
: WL_SOCKET_WRITEABLE); #ifdef WIN32
/* Windows needs a different test while waiting for connection-made */
else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
io_flag = WL_SOCKET_CONNECTED;
#endif
else
io_flag = WL_SOCKET_WRITEABLE;
rc = WaitLatchOrSocket(MyLatch, rc = WaitLatchOrSocket(MyLatch,
WL_POSTMASTER_DEATH | WL_POSTMASTER_DEATH |
......
...@@ -344,9 +344,9 @@ WaitLatch(volatile Latch *latch, int wakeEvents, long timeout, ...@@ -344,9 +344,9 @@ WaitLatch(volatile Latch *latch, int wakeEvents, long timeout,
* Like WaitLatch, but with an extra socket argument for WL_SOCKET_* * Like WaitLatch, but with an extra socket argument for WL_SOCKET_*
* conditions. * conditions.
* *
* When waiting on a socket, EOF and error conditions are reported by * When waiting on a socket, EOF and error conditions always cause the socket
* returning the socket as readable/writable or both, depending on * to be reported as readable/writable/connected, so that the caller can deal
* WL_SOCKET_READABLE/WL_SOCKET_WRITEABLE being specified. * with the condition.
* *
* NB: These days this is just a wrapper around the WaitEventSet API. When * NB: These days this is just a wrapper around the WaitEventSet API. When
* using a latch very frequently, consider creating a longer living * using a latch very frequently, consider creating a longer living
...@@ -374,11 +374,11 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, ...@@ -374,11 +374,11 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
NULL, NULL); NULL, NULL);
if (wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) if (wakeEvents & WL_SOCKET_MASK)
{ {
int ev; int ev;
ev = wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); ev = wakeEvents & WL_SOCKET_MASK;
AddWaitEventToSet(set, ev, sock, NULL, NULL); AddWaitEventToSet(set, ev, sock, NULL, NULL);
} }
...@@ -390,8 +390,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, ...@@ -390,8 +390,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
{ {
ret |= event.events & (WL_LATCH_SET | ret |= event.events & (WL_LATCH_SET |
WL_POSTMASTER_DEATH | WL_POSTMASTER_DEATH |
WL_SOCKET_READABLE | WL_SOCKET_MASK);
WL_SOCKET_WRITEABLE);
} }
FreeWaitEventSet(set); FreeWaitEventSet(set);
...@@ -640,10 +639,13 @@ FreeWaitEventSet(WaitEventSet *set) ...@@ -640,10 +639,13 @@ FreeWaitEventSet(WaitEventSet *set)
* Add an event to the set. Possible events are: * Add an event to the set. Possible events are:
* - WL_LATCH_SET: Wait for the latch to be set * - WL_LATCH_SET: Wait for the latch to be set
* - WL_POSTMASTER_DEATH: Wait for postmaster to die * - WL_POSTMASTER_DEATH: Wait for postmaster to die
* - WL_SOCKET_READABLE: Wait for socket to become readable * - WL_SOCKET_READABLE: Wait for socket to become readable,
* can be combined in one event with WL_SOCKET_WRITEABLE * can be combined in one event with other WL_SOCKET_* events
* - WL_SOCKET_WRITEABLE: Wait for socket to become writeable * - WL_SOCKET_WRITEABLE: Wait for socket to become writeable,
* can be combined with WL_SOCKET_READABLE * can be combined with other WL_SOCKET_* events
* - WL_SOCKET_CONNECTED: Wait for socket connection to be established,
* can be combined with other WL_SOCKET_* events (on non-Windows
* platforms, this is the same as WL_SOCKET_WRITEABLE)
* *
* Returns the offset in WaitEventSet->events (starting from 0), which can be * Returns the offset in WaitEventSet->events (starting from 0), which can be
* used to modify previously added wait events using ModifyWaitEvent(). * used to modify previously added wait events using ModifyWaitEvent().
...@@ -652,9 +654,9 @@ FreeWaitEventSet(WaitEventSet *set) ...@@ -652,9 +654,9 @@ FreeWaitEventSet(WaitEventSet *set)
* i.e. it must be a process-local latch initialized with InitLatch, or a * i.e. it must be a process-local latch initialized with InitLatch, or a
* shared latch associated with the current process by calling OwnLatch. * shared latch associated with the current process by calling OwnLatch.
* *
* In the WL_SOCKET_READABLE/WRITEABLE case, EOF and error conditions are * In the WL_SOCKET_READABLE/WRITEABLE/CONNECTED cases, EOF and error
* reported by returning the socket as readable/writable or both, depending on * conditions cause the socket to be reported as readable/writable/connected,
* WL_SOCKET_READABLE/WRITEABLE being specified. * so that the caller can deal with the condition.
* *
* The user_data pointer specified here will be set for the events returned * The user_data pointer specified here will be set for the events returned
* by WaitEventSetWait(), allowing to easily associate additional data with * by WaitEventSetWait(), allowing to easily associate additional data with
...@@ -685,8 +687,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, ...@@ -685,8 +687,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
} }
/* waiting for socket readiness without a socket indicates a bug */ /* waiting for socket readiness without a socket indicates a bug */
if (fd == PGINVALID_SOCKET && if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
(events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)))
elog(ERROR, "cannot wait on socket event without a socket"); elog(ERROR, "cannot wait on socket event without a socket");
event = &set->events[set->nevents]; event = &set->events[set->nevents];
...@@ -885,6 +886,8 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event) ...@@ -885,6 +886,8 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
flags |= FD_READ; flags |= FD_READ;
if (event->events & WL_SOCKET_WRITEABLE) if (event->events & WL_SOCKET_WRITEABLE)
flags |= FD_WRITE; flags |= FD_WRITE;
if (event->events & WL_SOCKET_CONNECTED)
flags |= FD_CONNECT;
if (*handle == WSA_INVALID_EVENT) if (*handle == WSA_INVALID_EVENT)
{ {
...@@ -1395,7 +1398,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, ...@@ -1395,7 +1398,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
returned_events++; returned_events++;
} }
} }
else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) else if (cur_event->events & WL_SOCKET_MASK)
{ {
WSANETWORKEVENTS resEvents; WSANETWORKEVENTS resEvents;
HANDLE handle = set->handles[cur_event->pos + 1]; HANDLE handle = set->handles[cur_event->pos + 1];
...@@ -1432,13 +1435,16 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, ...@@ -1432,13 +1435,16 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
/* writeable */ /* writeable */
occurred_events->events |= WL_SOCKET_WRITEABLE; occurred_events->events |= WL_SOCKET_WRITEABLE;
} }
if ((cur_event->events & WL_SOCKET_CONNECTED) &&
(resEvents.lNetworkEvents & FD_CONNECT))
{
/* connected */
occurred_events->events |= WL_SOCKET_CONNECTED;
}
if (resEvents.lNetworkEvents & FD_CLOSE) if (resEvents.lNetworkEvents & FD_CLOSE)
{ {
/* EOF */ /* EOF/error, so signal all caller-requested socket flags */
if (cur_event->events & WL_SOCKET_READABLE) occurred_events->events |= (cur_event->events & WL_SOCKET_MASK);
occurred_events->events |= WL_SOCKET_READABLE;
if (cur_event->events & WL_SOCKET_WRITEABLE)
occurred_events->events |= WL_SOCKET_WRITEABLE;
} }
if (occurred_events->events != 0) if (occurred_events->events != 0)
......
...@@ -126,6 +126,16 @@ typedef struct Latch ...@@ -126,6 +126,16 @@ typedef struct Latch
#define WL_SOCKET_WRITEABLE (1 << 2) #define WL_SOCKET_WRITEABLE (1 << 2)
#define WL_TIMEOUT (1 << 3) /* not for WaitEventSetWait() */ #define WL_TIMEOUT (1 << 3) /* not for WaitEventSetWait() */
#define WL_POSTMASTER_DEATH (1 << 4) #define WL_POSTMASTER_DEATH (1 << 4)
#ifdef WIN32
#define WL_SOCKET_CONNECTED (1 << 5)
#else
/* avoid having to to deal with case on platforms not requiring it */
#define WL_SOCKET_CONNECTED WL_SOCKET_WRITEABLE
#endif
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
WL_SOCKET_WRITEABLE | \
WL_SOCKET_CONNECTED)
typedef struct WaitEvent typedef struct WaitEvent
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment