Commit 03a571a4 authored by Magnus Hagander's avatar Magnus Hagander

Add wrapper function libpqrcv_PQexec() in the walreceiver that uses async

libpq to send queries, making the waiting for responses interruptible on
platforms where PQexec() can't normally be interrupted by signals, such
as win32.

Fujii Masao and Magnus Hagander
parent 5b89ef38
......@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.8 2010/03/21 00:17:58 petere Exp $
* $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -54,6 +54,7 @@ static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
static bool libpq_select(int timeout_ms);
static PGresult *libpqrcv_PQexec(const char *query);
/*
* Module load callback
......@@ -97,7 +98,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
* Get the system identifier and timeline ID as a DataRow message from the
* primary server.
*/
res = PQexec(streamConn, "IDENTIFY_SYSTEM");
res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
PQclear(res);
......@@ -149,11 +150,14 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
/* Start streaming from the point requested by startup process */
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
res = PQexec(streamConn, cmd);
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_OUT)
{
PQclear(res);
ereport(ERROR,
(errmsg("could not start WAL streaming: %s",
PQerrorMessage(streamConn))));
}
PQclear(res);
justconnected = true;
......@@ -224,6 +228,84 @@ libpq_select(int timeout_ms)
return true;
}
/*
* Send a query and wait for the results by using the asynchronous libpq
* functions and the backend version of select().
*
* We must not use the regular blocking libpq functions like PQexec()
* since they are uninterruptible by signals on some platforms, such as
* Windows.
*
* We must also not use vanilla select() here since it cannot handle the
* signal emulation layer on Windows.
*
* The function is modeled on PQexec() in libpq, but only implements
* those parts that are in use in the walreceiver.
*
* Queries are always executed on the connection in streamConn.
*/
static PGresult *
libpqrcv_PQexec(const char *query)
{
PGresult *result = NULL;
PGresult *lastResult = NULL;
/*
* PQexec() silently discards any prior query results on the
* connection. This is not required for walreceiver since it's
* expected that walsender won't generate any such junk results.
*/
/*
* Submit a query. Since we don't use non-blocking mode, this also
* can block. But its risk is relatively small, so we ignore that
* for now.
*/
if (!PQsendQuery(streamConn, query))
return NULL;
for (;;)
{
/*
* Receive data until PQgetResult is ready to get the result
* without blocking.
*/
while (PQisBusy(streamConn))
{
/*
* We don't need to break down the sleep into smaller increments,
* and check for interrupts after each nap, since we can just
* elog(FATAL) within SIGTERM signal handler if the signal
* arrives in the middle of establishment of replication connection.
*/
if (!libpq_select(-1))
continue; /* interrupted */
if (PQconsumeInput(streamConn) == 0)
return NULL; /* trouble */
}
/*
* Emulate the PQexec()'s behavior of returning the last result
* when there are many.
* Since walsender will never generate multiple results, we skip
* the concatenation of error messages.
*/
result = PQgetResult(streamConn);
if (result == NULL)
break; /* query is complete */
PQclear(lastResult);
lastResult = result;
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
PQstatus(streamConn) == CONNECTION_BAD)
break;
}
return lastResult;
}
/*
* Disconnect connection to primary, if any.
*/
......
......@@ -29,7 +29,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.8 2010/04/13 08:16:09 mha Exp $
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -86,8 +86,8 @@ static void DisableWalRcvImmediateExit(void);
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
* check it in the main loop, because we perform some blocking libpq
* operations like PQexec(), which can take a long time to finish.
* check it in the main loop, because we perform some blocking operations
* like libpqrcv_PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment