Commit a1a789eb authored by Tom Lane's avatar Tom Lane

In walreceiver, don't try to do ereport() in a signal handler.

This is quite unsafe, even for the case of ereport(FATAL) where we won't
return control to the interrupted code, and despite this code's use of
a flag to restrict the areas where we'd try to do it.  It's possible
for example that we interrupt malloc or free while that's holding a lock
that's meant to protect against cross-thread interference.  Then, any
attempt to do malloc or free within ereport() will result in a deadlock,
preventing the walreceiver process from exiting in response to SIGTERM.
We hypothesize that this explains some hard-to-reproduce failures seen
in the buildfarm.

Hence, get rid of the immediate-exit code in WalRcvShutdownHandler,
as well as the logic associated with WalRcvImmediateInterruptOK.
Instead, we need to take care that potentially-blocking operations
in the walreceiver's data transmission logic (libpqwalreceiver.c)
will respond reasonably promptly to the process's latch becoming
set and then call ProcessWalRcvInterrupts.  Much of the needed code
for that was already present in libpqwalreceiver.c.  I refactored
things a bit so that all the uses of PQgetResult use latch-aware
waiting, but didn't need to do much more.

These changes should be enough to ensure that libpqwalreceiver.c
will respond promptly to SIGTERM whenever it's waiting to receive
data.  In principle, it could block for a long time while waiting
to send data too, and this patch does nothing to guard against that.
I think that that hazard is mostly theoretical though: such blocking
should occur only if we fill the kernel's data transmission buffers,
and we don't generally send enough data to make that happen without
waiting for input.  If we find out that the hazard isn't just
theoretical, we could fix it by using PQsetnonblocking, but that
would require more ticklish changes than I care to make now.

This is a bug fix, but it seems like too big a change to push into
the back branches without much more testing than there's time for
right now.  Perhaps we'll back-patch once we have more confidence
in the change.

Patch by me; thanks to Thomas Munro for review.

Discussion: https://postgr.es/m/20190416070119.GK2673@paquier.xyz
parent 9c592896
...@@ -99,6 +99,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { ...@@ -99,6 +99,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
/* Prototypes for private functions */ /* Prototypes for private functions */
static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
static char *stringlist_to_identifierstr(PGconn *conn, List *strings); static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
/* /*
...@@ -196,7 +197,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname, ...@@ -196,7 +197,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
if (rc & WL_LATCH_SET) if (rc & WL_LATCH_SET)
{ {
ResetLatch(MyLatch); ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS(); ProcessWalRcvInterrupts();
} }
/* If socket is ready, advance the libpq state machine */ /* If socket is ready, advance the libpq state machine */
...@@ -456,6 +457,10 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) ...@@ -456,6 +457,10 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
{ {
PGresult *res; PGresult *res;
/*
* Send copy-end message. As in libpqrcv_PQexec, this could theoretically
* block, but the risk seems small.
*/
if (PQputCopyEnd(conn->streamConn, NULL) <= 0 || if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
PQflush(conn->streamConn)) PQflush(conn->streamConn))
ereport(ERROR, ereport(ERROR,
...@@ -472,7 +477,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) ...@@ -472,7 +477,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
* If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
* also possible in case we aborted the copy in mid-stream. * also possible in case we aborted the copy in mid-stream.
*/ */
res = PQgetResult(conn->streamConn); res = libpqrcv_PQgetResult(conn->streamConn);
if (PQresultStatus(res) == PGRES_TUPLES_OK) if (PQresultStatus(res) == PGRES_TUPLES_OK)
{ {
/* /*
...@@ -486,7 +491,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) ...@@ -486,7 +491,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
PQclear(res); PQclear(res);
/* the result set should be followed by CommandComplete */ /* the result set should be followed by CommandComplete */
res = PQgetResult(conn->streamConn); res = libpqrcv_PQgetResult(conn->streamConn);
} }
else if (PQresultStatus(res) == PGRES_COPY_OUT) else if (PQresultStatus(res) == PGRES_COPY_OUT)
{ {
...@@ -499,7 +504,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) ...@@ -499,7 +504,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
pchomp(PQerrorMessage(conn->streamConn))))); pchomp(PQerrorMessage(conn->streamConn)))));
/* CommandComplete should follow */ /* CommandComplete should follow */
res = PQgetResult(conn->streamConn); res = libpqrcv_PQgetResult(conn->streamConn);
} }
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
...@@ -509,7 +514,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) ...@@ -509,7 +514,7 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
PQclear(res); PQclear(res);
/* Verify that there are no more results */ /* Verify that there are no more results */
res = PQgetResult(conn->streamConn); res = libpqrcv_PQgetResult(conn->streamConn);
if (res != NULL) if (res != NULL)
ereport(ERROR, ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s", (errmsg("unexpected result after CommandComplete: %s",
...@@ -572,12 +577,11 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, ...@@ -572,12 +577,11 @@ libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
* The function is modeled on PQexec() in libpq, but only implements * The function is modeled on PQexec() in libpq, but only implements
* those parts that are in use in the walreceiver api. * those parts that are in use in the walreceiver api.
* *
* Queries are always executed on the connection in streamConn. * May return NULL, rather than an error result, on failure.
*/ */
static PGresult * static PGresult *
libpqrcv_PQexec(PGconn *streamConn, const char *query) libpqrcv_PQexec(PGconn *streamConn, const char *query)
{ {
PGresult *result = NULL;
PGresult *lastResult = NULL; PGresult *lastResult = NULL;
/* /*
...@@ -588,16 +592,47 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) ...@@ -588,16 +592,47 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
*/ */
/* /*
* Submit a query. Since we don't use non-blocking mode, this also can * Submit the query. Since we don't use non-blocking mode, this could
* block. But its risk is relatively small, so we ignore that for now. * theoretically block. In practice, since we don't send very long query
* strings, the risk seems negligible.
*/ */
if (!PQsendQuery(streamConn, query)) if (!PQsendQuery(streamConn, query))
return NULL; return NULL;
for (;;) for (;;)
{ {
/* Wait for, and collect, the next PGresult. */
PGresult *result;
result = libpqrcv_PQgetResult(streamConn);
if (result == NULL)
break; /* query is complete, or failure */
/*
* Emulate PQexec()'s behavior of returning the last result when there
* are many. We are fine with returning just last error message.
*/
PQclear(lastResult);
lastResult = result;
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
PQstatus(streamConn) == CONNECTION_BAD)
break;
}
return lastResult;
}
/*
* Perform the equivalent of PQgetResult(), but watch for interrupts.
*/
static PGresult *
libpqrcv_PQgetResult(PGconn *streamConn)
{
/* /*
* Receive data until PQgetResult is ready to get the result without * Collect data until PQgetResult is ready to get the result without
* blocking. * blocking.
*/ */
while (PQisBusy(streamConn)) while (PQisBusy(streamConn))
...@@ -606,10 +641,8 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) ...@@ -606,10 +641,8 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
/* /*
* We don't need to break down the sleep into smaller increments, * We don't need to break down the sleep into smaller increments,
* since we'll get interrupted by signals and can either handle * since we'll get interrupted by signals and can handle any
* interrupts here or elog(FATAL) within SIGTERM signal handler if * interrupts here.
* the signal arrives in the middle of establishment of
* replication connection.
*/ */
rc = WaitLatchOrSocket(MyLatch, rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
...@@ -622,37 +655,19 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) ...@@ -622,37 +655,19 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
if (rc & WL_LATCH_SET) if (rc & WL_LATCH_SET)
{ {
ResetLatch(MyLatch); ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS(); ProcessWalRcvInterrupts();
} }
/* Consume whatever data is available from the socket */ /* Consume whatever data is available from the socket */
if (PQconsumeInput(streamConn) == 0) if (PQconsumeInput(streamConn) == 0)
{ {
/* trouble; drop whatever we had and return NULL */ /* trouble; return NULL */
PQclear(lastResult);
return NULL; return NULL;
} }
} }
/* /* Now we can collect and return the next PGresult */
* Emulate PQexec()'s behavior of returning the last result when there return PQgetResult(streamConn);
* are many. We are fine with returning just last error message.
*/
result = PQgetResult(streamConn);
if (result == NULL)
break; /* query is complete */
PQclear(lastResult);
lastResult = result;
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
PQstatus(streamConn) == CONNECTION_BAD)
break;
}
return lastResult;
} }
/* /*
...@@ -716,13 +731,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, ...@@ -716,13 +731,13 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer,
{ {
PGresult *res; PGresult *res;
res = PQgetResult(conn->streamConn); res = libpqrcv_PQgetResult(conn->streamConn);
if (PQresultStatus(res) == PGRES_COMMAND_OK) if (PQresultStatus(res) == PGRES_COMMAND_OK)
{ {
PQclear(res); PQclear(res);
/* Verify that there are no more results. */ /* Verify that there are no more results. */
res = PQgetResult(conn->streamConn); res = libpqrcv_PQgetResult(conn->streamConn);
if (res != NULL) if (res != NULL)
{ {
PQclear(res); PQclear(res);
...@@ -886,7 +901,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres, ...@@ -886,7 +901,7 @@ libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
{ {
char *cstrs[MaxTupleAttributeNumber]; char *cstrs[MaxTupleAttributeNumber];
CHECK_FOR_INTERRUPTS(); ProcessWalRcvInterrupts();
/* Do the allocations in temporary context. */ /* Do the allocations in temporary context. */
oldcontext = MemoryContextSwitchTo(rowcontext); oldcontext = MemoryContextSwitchTo(rowcontext);
......
...@@ -112,28 +112,7 @@ static struct ...@@ -112,28 +112,7 @@ static struct
static StringInfoData reply_message; static StringInfoData reply_message;
static StringInfoData incoming_message; static StringInfoData incoming_message;
/*
* About SIGTERM handling:
*
* We can't just exit(1) within SIGTERM signal handler, because the signal
* might arrive in the middle of some critical operation, like while we're
* holding a spinlock. We also can't just set a flag in signal handler and
* check it in the main loop, because we perform some blocking operations
* like libpqrcv_PQexec(), which can take a long time to finish.
*
* We use a combined approach: When WalRcvImmediateInterruptOK is true, it's
* safe for the signal handler to elog(FATAL) immediately. Otherwise it just
* sets got_SIGTERM flag, which is checked in the main loop when convenient.
*
* This is very much like what regular backends do with ImmediateInterruptOK,
* ProcessInterrupts() etc.
*/
static volatile bool WalRcvImmediateInterruptOK = false;
/* Prototypes for private functions */ /* Prototypes for private functions */
static void ProcessWalRcvInterrupts(void);
static void EnableWalRcvImmediateExit(void);
static void DisableWalRcvImmediateExit(void);
static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI); static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
static void WalRcvDie(int code, Datum arg); static void WalRcvDie(int code, Datum arg);
...@@ -151,7 +130,20 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS); ...@@ -151,7 +130,20 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
static void WalRcvQuickDieHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS);
static void /*
* Process any interrupts the walreceiver process may have received.
* This should be called any time the process's latch has become set.
*
* Currently, only SIGTERM is of interest. We can't just exit(1) within the
* SIGTERM signal handler, because the signal might arrive in the middle of
* some critical operation, like while we're holding a spinlock. Instead, the
* signal handler sets a flag variable as well as setting the process's latch.
* We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
* latch has become set. Operations that could block for a long time, such as
* reading from a remote server, must pay attention to the latch too; see
* libpqrcv_PQgetResult for example.
*/
void
ProcessWalRcvInterrupts(void) ProcessWalRcvInterrupts(void)
{ {
/* /*
...@@ -163,26 +155,12 @@ ProcessWalRcvInterrupts(void) ...@@ -163,26 +155,12 @@ ProcessWalRcvInterrupts(void)
if (got_SIGTERM) if (got_SIGTERM)
{ {
WalRcvImmediateInterruptOK = false;
ereport(FATAL, ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN), (errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating walreceiver process due to administrator command"))); errmsg("terminating walreceiver process due to administrator command")));
} }
} }
static void
EnableWalRcvImmediateExit(void)
{
WalRcvImmediateInterruptOK = true;
ProcessWalRcvInterrupts();
}
static void
DisableWalRcvImmediateExit(void)
{
WalRcvImmediateInterruptOK = false;
ProcessWalRcvInterrupts();
}
/* Main entry point for walreceiver process */ /* Main entry point for walreceiver process */
void void
...@@ -292,12 +270,10 @@ WalReceiverMain(void) ...@@ -292,12 +270,10 @@ WalReceiverMain(void)
PG_SETMASK(&UnBlockSig); PG_SETMASK(&UnBlockSig);
/* Establish the connection to the primary for XLOG streaming */ /* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err); wrconn = walrcv_connect(conninfo, false, cluster_name[0] ? cluster_name : "walreceiver", &err);
if (!wrconn) if (!wrconn)
ereport(ERROR, ereport(ERROR,
(errmsg("could not connect to the primary server: %s", err))); (errmsg("could not connect to the primary server: %s", err)));
DisableWalRcvImmediateExit();
/* /*
* Save user-visible connection string. This clobbers the original * Save user-visible connection string. This clobbers the original
...@@ -336,7 +312,6 @@ WalReceiverMain(void) ...@@ -336,7 +312,6 @@ WalReceiverMain(void)
* Check that we're connected to a valid server using the * Check that we're connected to a valid server using the
* IDENTIFY_SYSTEM replication command. * IDENTIFY_SYSTEM replication command.
*/ */
EnableWalRcvImmediateExit();
primary_sysid = walrcv_identify_system(wrconn, &primaryTLI); primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
...@@ -348,7 +323,6 @@ WalReceiverMain(void) ...@@ -348,7 +323,6 @@ WalReceiverMain(void)
errdetail("The primary's identifier is %s, the standby's identifier is %s.", errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid))); primary_sysid, standby_sysid)));
} }
DisableWalRcvImmediateExit();
/* /*
* Confirm that the current timeline of the primary is the same or * Confirm that the current timeline of the primary is the same or
...@@ -509,6 +483,8 @@ WalReceiverMain(void) ...@@ -509,6 +483,8 @@ WalReceiverMain(void)
if (rc & WL_LATCH_SET) if (rc & WL_LATCH_SET)
{ {
ResetLatch(walrcv->latch); ResetLatch(walrcv->latch);
ProcessWalRcvInterrupts();
if (walrcv->force_reply) if (walrcv->force_reply)
{ {
/* /*
...@@ -577,9 +553,7 @@ WalReceiverMain(void) ...@@ -577,9 +553,7 @@ WalReceiverMain(void)
* The backend finished streaming. Exit streaming COPY-mode from * The backend finished streaming. Exit streaming COPY-mode from
* our side, too. * our side, too.
*/ */
EnableWalRcvImmediateExit();
walrcv_endstreaming(wrconn, &primaryTLI); walrcv_endstreaming(wrconn, &primaryTLI);
DisableWalRcvImmediateExit();
/* /*
* If the server had switched to a new timeline that we didn't * If the server had switched to a new timeline that we didn't
...@@ -726,9 +700,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last) ...@@ -726,9 +700,7 @@ WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
(errmsg("fetching timeline history file for timeline %u from primary server", (errmsg("fetching timeline history file for timeline %u from primary server",
tli))); tli)));
EnableWalRcvImmediateExit();
walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len); walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
DisableWalRcvImmediateExit();
/* /*
* Check that the filename on the master matches what we * Check that the filename on the master matches what we
...@@ -805,7 +777,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS) ...@@ -805,7 +777,7 @@ WalRcvSigUsr1Handler(SIGNAL_ARGS)
errno = save_errno; errno = save_errno;
} }
/* SIGTERM: set flag for main loop, or shutdown immediately if safe */ /* SIGTERM: set flag for ProcessWalRcvInterrupts */
static void static void
WalRcvShutdownHandler(SIGNAL_ARGS) WalRcvShutdownHandler(SIGNAL_ARGS)
{ {
...@@ -816,10 +788,6 @@ WalRcvShutdownHandler(SIGNAL_ARGS) ...@@ -816,10 +788,6 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
if (WalRcv->latch) if (WalRcv->latch)
SetLatch(WalRcv->latch); SetLatch(WalRcv->latch);
/* Don't joggle the elbow of proc_exit */
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
ProcessWalRcvInterrupts();
errno = save_errno; errno = save_errno;
} }
......
...@@ -302,6 +302,7 @@ walrcv_clear_result(WalRcvExecResult *walres) ...@@ -302,6 +302,7 @@ walrcv_clear_result(WalRcvExecResult *walres)
/* prototypes for functions in walreceiver.c */ /* prototypes for functions in walreceiver.c */
extern void WalReceiverMain(void) pg_attribute_noreturn(); extern void WalReceiverMain(void) pg_attribute_noreturn();
extern void ProcessWalRcvInterrupts(void);
/* prototypes for functions in walreceiverfuncs.c */ /* prototypes for functions in walreceiverfuncs.c */
extern Size WalRcvShmemSize(void); extern Size WalRcvShmemSize(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment