Commit 6230912f authored by Thomas Munro's avatar Thomas Munro

Use FeBeWaitSet for walsender.c.

This avoids the need to set up and tear down a fresh WaitEventSet every
time we need need to wait.  We have to add an explicit exit on
postmaster exit (FeBeWaitSet isn't set up to do that automatically), so
move the code to do that into a new function to avoid repetition.

Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> (earlier version)
Discussion: https://postgr.es/m/CA%2BhUKGJAC4Oqao%3DqforhNey20J8CiG2R%3DoBPqvfR0vOJrFysGw%40mail.gmail.com
parent a042ba2b
...@@ -244,6 +244,7 @@ static void WalSndKeepalive(bool requestReply); ...@@ -244,6 +244,7 @@ static void WalSndKeepalive(bool requestReply);
static void WalSndKeepaliveIfNecessary(void); static void WalSndKeepaliveIfNecessary(void);
static void WalSndCheckTimeOut(void); static void WalSndCheckTimeOut(void);
static long WalSndComputeSleeptime(TimestampTz now); static long WalSndComputeSleeptime(TimestampTz now);
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
...@@ -1287,7 +1288,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, ...@@ -1287,7 +1288,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/* If we have pending write here, go to slow path */ /* If we have pending write here, go to slow path */
for (;;) for (;;)
{ {
int wakeEvents;
long sleeptime; long sleeptime;
/* Check for input from the client */ /* Check for input from the client */
...@@ -1304,12 +1304,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, ...@@ -1304,12 +1304,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
/* Sleep until something happens or we time out */ /* Sleep until something happens or we time out */
(void) WaitLatchOrSocket(MyLatch, wakeEvents, WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime,
MyProcPort->sock, sleeptime,
WAIT_EVENT_WAL_SENDER_WRITE_DATA); WAIT_EVENT_WAL_SENDER_WRITE_DATA);
/* Clear any already-pending wakeups */ /* Clear any already-pending wakeups */
...@@ -1480,15 +1476,12 @@ WalSndWaitForWal(XLogRecPtr loc) ...@@ -1480,15 +1476,12 @@ WalSndWaitForWal(XLogRecPtr loc)
*/ */
sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | wakeEvents = WL_SOCKET_READABLE;
WL_SOCKET_READABLE | WL_TIMEOUT;
if (pq_is_send_pending()) if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE; wakeEvents |= WL_SOCKET_WRITEABLE;
(void) WaitLatchOrSocket(MyLatch, wakeEvents, WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
MyProcPort->sock, sleeptime,
WAIT_EVENT_WAL_SENDER_WAIT_WAL);
} }
/* reactivate latch so WalSndLoop knows to continue */ /* reactivate latch so WalSndLoop knows to continue */
...@@ -2348,10 +2341,10 @@ WalSndLoop(WalSndSendDataCallback send_data) ...@@ -2348,10 +2341,10 @@ WalSndLoop(WalSndSendDataCallback send_data)
long sleeptime; long sleeptime;
int wakeEvents; int wakeEvents;
wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT;
if (!streamingDoneReceiving) if (!streamingDoneReceiving)
wakeEvents |= WL_SOCKET_READABLE; wakeEvents = WL_SOCKET_READABLE;
else
wakeEvents = 0;
/* /*
* Use fresh timestamp, not last_processing, to reduce the chance * Use fresh timestamp, not last_processing, to reduce the chance
...@@ -2363,9 +2356,7 @@ WalSndLoop(WalSndSendDataCallback send_data) ...@@ -2363,9 +2356,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
wakeEvents |= WL_SOCKET_WRITEABLE; wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep until something happens or we time out */ /* Sleep until something happens or we time out */
(void) WaitLatchOrSocket(MyLatch, wakeEvents, WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
MyProcPort->sock, sleeptime,
WAIT_EVENT_WAL_SENDER_MAIN);
} }
} }
} }
...@@ -3121,6 +3112,22 @@ WalSndWakeup(void) ...@@ -3121,6 +3112,22 @@ WalSndWakeup(void)
} }
} }
/*
* Wait for readiness on the FeBe socket, or a timeout. The mask should be
* composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
* on postmaster death.
*/
static void
WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
{
WaitEvent event;
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
(event.events & WL_POSTMASTER_DEATH))
proc_exit(1);
}
/* /*
* Signal all walsenders to move to stopping state. * Signal all walsenders to move to stopping state.
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment