Commit 9206ced1 authored by Andres Freund's avatar Andres Freund

Clean up latch related code.

The larger part of this patch replaces usages of MyProc->procLatch
with MyLatch.  The latter works even early during backend startup,
where MyProc->procLatch doesn't yet.  While the affected code
shouldn't run in cases where it's not initialized, it might get copied
into places where it might.  Using MyLatch is simpler and a bit faster
to boot, so there's little point to stick with the previous coding.

While doing so I noticed some weaknesses around newly introduced uses
of latches that could lead to missed events, and an omitted
CHECK_FOR_INTERRUPTS() call in worker_spi.

As all the actual bugs are in v10 code, there doesn't seem to be
sufficient reason to backpatch this.

Author: Andres Freund
Discussion:
    https://postgr.es/m/20170606195321.sjmenrfgl2nu6j63@alap3.anarazel.de
    https://postgr.es/m/20170606210405.sim3yl6vpudhmufo@alap3.anarazel.de
Backpatch: -
parent e3a815d2
......@@ -527,9 +527,9 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
if (!anyone_alive)
break;
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1,
WaitLatch(MyLatch, WL_LATCH_SET, -1,
WAIT_EVENT_PARALLEL_FINISH);
ResetLatch(&MyProc->procLatch);
ResetLatch(MyLatch);
}
if (pcxt->toc != NULL)
......
......@@ -172,9 +172,9 @@ mq_putmessage(char msgtype, const char *s, size_t len)
if (result != SHM_MQ_WOULD_BLOCK)
break;
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0,
WaitLatch(MyLatch, WL_LATCH_SET, 0,
WAIT_EVENT_MQ_PUT_MESSAGE);
ResetLatch(&MyProc->procLatch);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
......
......@@ -1144,7 +1144,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
if (status == BGWH_STOPPED)
break;
rc = WaitLatch(&MyProc->procLatch,
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
WAIT_EVENT_BGWORKER_SHUTDOWN);
......@@ -1154,7 +1154,7 @@ WaitForBackgroundWorkerShutdown(BackgroundWorkerHandle *handle)
break;
}
ResetLatch(&MyProc->procLatch);
ResetLatch(MyLatch);
}
return status;
......
......@@ -176,7 +176,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
? WL_SOCKET_READABLE
: WL_SOCKET_WRITEABLE);
rc = WaitLatchOrSocket(&MyProc->procLatch,
rc = WaitLatchOrSocket(MyLatch,
WL_POSTMASTER_DEATH |
WL_LATCH_SET | io_flag,
PQsocket(conn->streamConn),
......@@ -190,7 +190,7 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
/* Interrupted? */
if (rc & WL_LATCH_SET)
{
ResetLatch(&MyProc->procLatch);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
......@@ -574,21 +574,22 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query)
* the signal arrives in the middle of establishment of
* replication connection.
*/
ResetLatch(&MyProc->procLatch);
rc = WaitLatchOrSocket(&MyProc->procLatch,
rc = WaitLatchOrSocket(MyLatch,
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
WL_LATCH_SET,
PQsocket(streamConn),
0,
WAIT_EVENT_LIBPQWALRECEIVER);
/* Emergency bailout? */
if (rc & WL_POSTMASTER_DEATH)
exit(1);
/* interrupted */
/* Interrupted? */
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
continue;
}
if (PQconsumeInput(streamConn) == 0)
return NULL; /* trouble */
......
......@@ -208,10 +208,15 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_STARTUP);
/* emergency bailout if postmaster has died */
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
ResetLatch(MyLatch);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
}
return;
......@@ -440,10 +445,8 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LWLockRelease(LogicalRepWorkerLock);
CHECK_FOR_INTERRUPTS();
/* Wait for signal. */
rc = WaitLatch(&MyProc->procLatch,
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_STARTUP);
......@@ -451,7 +454,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
ResetLatch(&MyProc->procLatch);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
/* Check worker status. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
......@@ -492,7 +499,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
CHECK_FOR_INTERRUPTS();
/* Wait for more work. */
rc = WaitLatch(&MyProc->procLatch,
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_BGWORKER_SHUTDOWN);
......@@ -500,7 +507,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
ResetLatch(&MyProc->procLatch);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
}
}
......@@ -876,7 +887,7 @@ ApplyLauncherMain(Datum main_arg)
}
/* Wait for more work. */
rc = WaitLatch(&MyProc->procLatch,
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
wait_time,
WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
......@@ -885,13 +896,17 @@ ApplyLauncherMain(Datum main_arg)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
ResetLatch(&MyProc->procLatch);
}
LogicalRepCtx->launcher_pid = 0;
......
......@@ -191,7 +191,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
if (!worker)
return false;
rc = WaitLatch(&MyProc->procLatch,
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
......@@ -199,7 +199,7 @@ wait_for_relation_state_change(Oid relid, char expected_state)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
ResetLatch(&MyProc->procLatch);
ResetLatch(MyLatch);
}
return false;
......@@ -236,7 +236,7 @@ wait_for_worker_state_change(char expected_state)
if (MyLogicalRepWorker->relstate == expected_state)
return true;
rc = WaitLatch(&MyProc->procLatch,
rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
......@@ -244,7 +244,7 @@ wait_for_worker_state_change(char expected_state)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
ResetLatch(&MyProc->procLatch);
ResetLatch(MyLatch);
}
return false;
......@@ -604,7 +604,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
/*
* Wait for more data or latch.
*/
rc = WaitLatchOrSocket(&MyProc->procLatch,
rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
WL_TIMEOUT | WL_POSTMASTER_DEATH,
fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
......@@ -613,7 +613,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
ResetLatch(&MyProc->procLatch);
ResetLatch(MyLatch);
}
return bytesread;
......
......@@ -1146,7 +1146,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/*
* Wait for more data or latch.
*/
rc = WaitLatchOrSocket(&MyProc->procLatch,
rc = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE | WL_LATCH_SET |
WL_TIMEOUT | WL_POSTMASTER_DEATH,
fd, NAPTIME_PER_CYCLE,
......@@ -1156,6 +1156,12 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
if (got_SIGHUP)
{
got_SIGHUP = false;
......@@ -1209,8 +1215,6 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
send_feedback(last_received, requestReply, requestReply);
}
ResetLatch(&MyProc->procLatch);
}
}
......
......@@ -68,14 +68,14 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
{
cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1);
AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
&MyProc->procLatch, NULL);
MyLatch, NULL);
}
/*
* Reset my latch before adding myself to the queue and before entering
* the caller's predicate loop.
*/
ResetLatch(&MyProc->procLatch);
ResetLatch(MyLatch);
/* Add myself to the wait queue. */
SpinLockAcquire(&cv->mutex);
......@@ -135,7 +135,7 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info);
/* Reset latch before testing whether we can return. */
ResetLatch(&MyProc->procLatch);
ResetLatch(MyLatch);
/*
* If this process has been taken out of the wait list, then we know
......
......@@ -235,6 +235,8 @@ worker_spi_main(Datum main_arg)
if (rc & WL_POSTMASTER_DEATH)
proc_exit(1);
CHECK_FOR_INTERRUPTS();
/*
* In case of a SIGHUP, just reload the configuration.
*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment