Commit ff44fba4 authored by Andres Freund's avatar Andres Freund

Replace walsender's latch with the general shared latch.

Relying on the normal shared latch simplifies interrupt/signal
handling because we can rely on all signal handlers setting the proc
latch. That in turn allows us to avoid the use of
ImmediateInterruptOK, which arguably isn't correct because
WaitLatchOrSocket isn't declared to be immediately interruptible.

Also change sections that wait on the walsender's latch to notice
interrupts quicker/more reliably and make them more consistent with
each other.

This is part of a larger "get rid of ImmediateInterruptOK" series.

Discussion: 20150115020335.GZ5245@awork2.anarazel.de
parent 20af53d7
...@@ -1294,15 +1294,21 @@ throttle(size_t increment) ...@@ -1294,15 +1294,21 @@ throttle(size_t increment)
/* Only sleep if the transfer is faster than it should be. */ /* Only sleep if the transfer is faster than it should be. */
if (sleep > 0) if (sleep > 0)
{ {
ResetLatch(&MyWalSnd->latch); ResetLatch(MyLatch);
/* We're eating a potentially set latch, so check for interrupts */
CHECK_FOR_INTERRUPTS();
/* /*
* (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
* the maximum time to sleep. Thus the cast to long is safe. * the maximum time to sleep. Thus the cast to long is safe.
*/ */
wait_result = WaitLatch(&MyWalSnd->latch, wait_result = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
(long) (sleep / 1000)); (long) (sleep / 1000));
if (wait_result & WL_LATCH_SET)
CHECK_FOR_INTERRUPTS();
} }
else else
{ {
......
...@@ -1081,6 +1081,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, ...@@ -1081,6 +1081,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
if (!PostmasterIsAlive()) if (!PostmasterIsAlive())
exit(1); exit(1);
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Process any requests or signals received recently */ /* Process any requests or signals received recently */
if (got_SIGHUP) if (got_SIGHUP)
{ {
...@@ -1092,9 +1097,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, ...@@ -1092,9 +1097,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/* Check for input from the client */ /* Check for input from the client */
ProcessRepliesIfAny(); ProcessRepliesIfAny();
/* Clear any already-pending wakeups */
ResetLatch(&MyWalSnd->latch);
/* Try to flush pending output to the client */ /* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0) if (pq_flush_if_writable() != 0)
WalSndShutdown(); WalSndShutdown();
...@@ -1117,15 +1119,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, ...@@ -1117,15 +1119,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
/* Sleep until something happens or we time out */ /* Sleep until something happens or we time out */
ImmediateInterruptOK = true; WaitLatchOrSocket(MyLatch, wakeEvents,
CHECK_FOR_INTERRUPTS();
WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
MyProcPort->sock, sleeptime); MyProcPort->sock, sleeptime);
ImmediateInterruptOK = false;
} }
/* reactivate latch so WalSndLoop knows to continue */ /* reactivate latch so WalSndLoop knows to continue */
SetLatch(&MyWalSnd->latch); SetLatch(MyLatch);
} }
/* /*
...@@ -1165,6 +1164,11 @@ WalSndWaitForWal(XLogRecPtr loc) ...@@ -1165,6 +1164,11 @@ WalSndWaitForWal(XLogRecPtr loc)
if (!PostmasterIsAlive()) if (!PostmasterIsAlive())
exit(1); exit(1);
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Process any requests or signals received recently */ /* Process any requests or signals received recently */
if (got_SIGHUP) if (got_SIGHUP)
{ {
...@@ -1176,9 +1180,6 @@ WalSndWaitForWal(XLogRecPtr loc) ...@@ -1176,9 +1180,6 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Check for input from the client */ /* Check for input from the client */
ProcessRepliesIfAny(); ProcessRepliesIfAny();
/* Clear any already-pending wakeups */
ResetLatch(&MyWalSnd->latch);
/* Update our idea of the currently flushed position. */ /* Update our idea of the currently flushed position. */
if (!RecoveryInProgress()) if (!RecoveryInProgress())
RecentFlushPtr = GetFlushRecPtr(); RecentFlushPtr = GetFlushRecPtr();
...@@ -1244,15 +1245,12 @@ WalSndWaitForWal(XLogRecPtr loc) ...@@ -1244,15 +1245,12 @@ WalSndWaitForWal(XLogRecPtr loc)
wakeEvents |= WL_SOCKET_WRITEABLE; wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep until something happens or we time out */ /* Sleep until something happens or we time out */
ImmediateInterruptOK = true; WaitLatchOrSocket(MyLatch, wakeEvents,
CHECK_FOR_INTERRUPTS();
WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
MyProcPort->sock, sleeptime); MyProcPort->sock, sleeptime);
ImmediateInterruptOK = false;
} }
/* reactivate latch so WalSndLoop knows to continue */ /* reactivate latch so WalSndLoop knows to continue */
SetLatch(&MyWalSnd->latch); SetLatch(MyLatch);
return RecentFlushPtr; return RecentFlushPtr;
} }
...@@ -1813,6 +1811,11 @@ WalSndLoop(WalSndSendDataCallback send_data) ...@@ -1813,6 +1811,11 @@ WalSndLoop(WalSndSendDataCallback send_data)
if (!PostmasterIsAlive()) if (!PostmasterIsAlive())
exit(1); exit(1);
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Process any requests or signals received recently */ /* Process any requests or signals received recently */
if (got_SIGHUP) if (got_SIGHUP)
{ {
...@@ -1821,14 +1824,9 @@ WalSndLoop(WalSndSendDataCallback send_data) ...@@ -1821,14 +1824,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
SyncRepInitConfig(); SyncRepInitConfig();
} }
CHECK_FOR_INTERRUPTS();
/* Check for input from the client */ /* Check for input from the client */
ProcessRepliesIfAny(); ProcessRepliesIfAny();
/* Clear any already-pending wakeups */
ResetLatch(&MyWalSnd->latch);
/* /*
* If we have received CopyDone from the client, sent CopyDone * If we have received CopyDone from the client, sent CopyDone
* ourselves, and the output buffer is empty, it's time to exit * ourselves, and the output buffer is empty, it's time to exit
...@@ -1912,11 +1910,8 @@ WalSndLoop(WalSndSendDataCallback send_data) ...@@ -1912,11 +1910,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
wakeEvents |= WL_SOCKET_WRITEABLE; wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep until something happens or we time out */ /* Sleep until something happens or we time out */
ImmediateInterruptOK = true; WaitLatchOrSocket(MyLatch, wakeEvents,
CHECK_FOR_INTERRUPTS();
WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
MyProcPort->sock, sleeptime); MyProcPort->sock, sleeptime);
ImmediateInterruptOK = false;
} }
} }
return; return;
...@@ -1959,9 +1954,9 @@ InitWalSenderSlot(void) ...@@ -1959,9 +1954,9 @@ InitWalSenderSlot(void)
walsnd->pid = MyProcPid; walsnd->pid = MyProcPid;
walsnd->sentPtr = InvalidXLogRecPtr; walsnd->sentPtr = InvalidXLogRecPtr;
walsnd->state = WALSNDSTATE_STARTUP; walsnd->state = WALSNDSTATE_STARTUP;
walsnd->latch = &MyProc->procLatch;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */ /* don't need the lock anymore */
OwnLatch((Latch *) &walsnd->latch);
MyWalSnd = (WalSnd *) walsnd; MyWalSnd = (WalSnd *) walsnd;
break; break;
...@@ -1986,19 +1981,14 @@ WalSndKill(int code, Datum arg) ...@@ -1986,19 +1981,14 @@ WalSndKill(int code, Datum arg)
Assert(walsnd != NULL); Assert(walsnd != NULL);
/*
* Clear MyWalSnd first; then disown the latch. This is so that signal
* handlers won't try to touch the latch after it's no longer ours.
*/
MyWalSnd = NULL; MyWalSnd = NULL;
DisownLatch(&walsnd->latch); SpinLockAcquire(&walsnd->mutex);
/* clear latch while holding the spinlock, so it can safely be read */
/* walsnd->latch = NULL;
* Mark WalSnd struct no longer in use. Assume that no lock is required /* Mark WalSnd struct as no longer being in use. */
* for this.
*/
walsnd->pid = 0; walsnd->pid = 0;
SpinLockRelease(&walsnd->mutex);
} }
/* /*
...@@ -2570,8 +2560,8 @@ WalSndSigHupHandler(SIGNAL_ARGS) ...@@ -2570,8 +2560,8 @@ WalSndSigHupHandler(SIGNAL_ARGS)
int save_errno = errno; int save_errno = errno;
got_SIGHUP = true; got_SIGHUP = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch); SetLatch(MyLatch);
errno = save_errno; errno = save_errno;
} }
...@@ -2603,8 +2593,7 @@ WalSndLastCycleHandler(SIGNAL_ARGS) ...@@ -2603,8 +2593,7 @@ WalSndLastCycleHandler(SIGNAL_ARGS)
kill(MyProcPid, SIGTERM); kill(MyProcPid, SIGTERM);
walsender_ready_to_stop = true; walsender_ready_to_stop = true;
if (MyWalSnd) SetLatch(MyLatch);
SetLatch(&MyWalSnd->latch);
errno = save_errno; errno = save_errno;
} }
...@@ -2668,7 +2657,6 @@ WalSndShmemInit(void) ...@@ -2668,7 +2657,6 @@ WalSndShmemInit(void)
WalSnd *walsnd = &WalSndCtl->walsnds[i]; WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockInit(&walsnd->mutex); SpinLockInit(&walsnd->mutex);
InitSharedLatch(&walsnd->latch);
} }
} }
} }
...@@ -2685,7 +2673,21 @@ WalSndWakeup(void) ...@@ -2685,7 +2673,21 @@ WalSndWakeup(void)
int i; int i;
for (i = 0; i < max_wal_senders; i++) for (i = 0; i < max_wal_senders; i++)
SetLatch(&WalSndCtl->walsnds[i].latch); {
Latch *latch;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
/*
* Get latch pointer with spinlock held, for the unlikely case that
* pointer reads aren't atomic (as they're 8 bytes).
*/
SpinLockAcquire(&walsnd->mutex);
latch = walsnd->latch;
SpinLockRelease(&walsnd->mutex);
if (latch != NULL)
SetLatch(latch);
}
} }
/* Set state for current walsender (only called in walsender) */ /* Set state for current walsender (only called in walsender) */
......
...@@ -51,10 +51,10 @@ typedef struct WalSnd ...@@ -51,10 +51,10 @@ typedef struct WalSnd
slock_t mutex; slock_t mutex;
/* /*
* Latch used by backends to wake up this walsender when it has work to * Pointer to the walsender's latch. Used by backends to wake up this
* do. * walsender when it has work to do. NULL if the walsender isn't active.
*/ */
Latch latch; Latch *latch;
/* /*
* The priority order of the standby managed by this WALSender, as listed * The priority order of the standby managed by this WALSender, as listed
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment