Commit 1e8a8500 authored by Peter Eisentraut's avatar Peter Eisentraut

Use asynchronous connect API in libpqwalreceiver

This makes the connection attempt from CREATE SUBSCRIPTION and from
WalReceiver interruptable by the user in case the libpq connection is
hanging.  The previous coding required immediate shutdown (SIGQUIT) of
PostgreSQL in that situation.

From: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Tested-by: default avatarThom Brown <thom@linux.com>
parent 9eb344fa
...@@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w) ...@@ -3340,8 +3340,8 @@ pgstat_get_wait_client(WaitEventClient w)
case WAIT_EVENT_WAL_RECEIVER_WAIT_START: case WAIT_EVENT_WAL_RECEIVER_WAIT_START:
event_name = "WalReceiverWaitStart"; event_name = "WalReceiverWaitStart";
break; break;
case WAIT_EVENT_LIBPQWALRECEIVER_READ: case WAIT_EVENT_LIBPQWALRECEIVER:
event_name = "LibPQWalReceiverRead"; event_name = "LibPQWalReceiver";
break; break;
case WAIT_EVENT_WAL_SENDER_WAIT_WAL: case WAIT_EVENT_WAL_SENDER_WAIT_WAL:
event_name = "WalSenderWaitForWAL"; event_name = "WalSenderWaitForWAL";
......
...@@ -113,6 +113,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, ...@@ -113,6 +113,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
char **err) char **err)
{ {
WalReceiverConn *conn; WalReceiverConn *conn;
PostgresPollingStatusType status;
const char *keys[5]; const char *keys[5];
const char *vals[5]; const char *vals[5];
int i = 0; int i = 0;
...@@ -146,7 +147,51 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, ...@@ -146,7 +147,51 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
Assert(i < sizeof(keys)); Assert(i < sizeof(keys));
conn = palloc0(sizeof(WalReceiverConn)); conn = palloc0(sizeof(WalReceiverConn));
conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true); conn->streamConn = PQconnectStartParams(keys, vals,
/* expand_dbname = */ true);
if (PQstatus(conn->streamConn) == CONNECTION_BAD)
{
*err = pchomp(PQerrorMessage(conn->streamConn));
return NULL;
}
/* Poll connection. */
do
{
/* Determine current state of the connection. */
status = PQconnectPoll(conn->streamConn);
/* Sleep a bit if waiting for socket. */
if (status == PGRES_POLLING_READING ||
status == PGRES_POLLING_WRITING)
{
int extra_flag;
int rc;
extra_flag = (status == PGRES_POLLING_READING
? WL_SOCKET_READABLE
: WL_SOCKET_WRITEABLE);
ResetLatch(&MyProc->procLatch);
rc = WaitLatchOrSocket(&MyProc->procLatch,
WL_POSTMASTER_DEATH |
WL_LATCH_SET | extra_flag,
PQsocket(conn->streamConn),
0,
WAIT_EVENT_LIBPQWALRECEIVER);
/* Emergency bailout. */
if (rc & WL_POSTMASTER_DEATH)
exit(1);
/* Interrupted. */
if (rc & WL_LATCH_SET)
CHECK_FOR_INTERRUPTS();
}
/* Otherwise loop until we have OK or FAILED status. */
} while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
if (PQstatus(conn->streamConn) != CONNECTION_OK) if (PQstatus(conn->streamConn) != CONNECTION_OK)
{ {
*err = pchomp(PQerrorMessage(conn->streamConn)); *err = pchomp(PQerrorMessage(conn->streamConn));
...@@ -529,7 +574,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) ...@@ -529,7 +574,7 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
WL_LATCH_SET, WL_LATCH_SET,
PQsocket(streamConn), PQsocket(streamConn),
0, 0,
WAIT_EVENT_LIBPQWALRECEIVER_READ); WAIT_EVENT_LIBPQWALRECEIVER);
if (rc & WL_POSTMASTER_DEATH) if (rc & WL_POSTMASTER_DEATH)
exit(1); exit(1);
......
...@@ -764,7 +764,7 @@ typedef enum ...@@ -764,7 +764,7 @@ typedef enum
WAIT_EVENT_CLIENT_WRITE, WAIT_EVENT_CLIENT_WRITE,
WAIT_EVENT_SSL_OPEN_SERVER, WAIT_EVENT_SSL_OPEN_SERVER,
WAIT_EVENT_WAL_RECEIVER_WAIT_START, WAIT_EVENT_WAL_RECEIVER_WAIT_START,
WAIT_EVENT_LIBPQWALRECEIVER_READ, WAIT_EVENT_LIBPQWALRECEIVER,
WAIT_EVENT_WAL_SENDER_WAIT_WAL, WAIT_EVENT_WAL_SENDER_WAIT_WAL,
WAIT_EVENT_WAL_SENDER_WRITE_DATA WAIT_EVENT_WAL_SENDER_WRITE_DATA
} WaitEventClient; } WaitEventClient;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment