Commit f83b5999 authored by Robert Haas's avatar Robert Haas

Make walsender more responsive.

Per testing by Andres Freund, this improves replication performance
and reduces replication latency and latency jitter.  I was a bit
concerned about moving more work into XLogInsert, but testing seems
to show that it's not a problem in practice.

Along the way, improve comments for WaitLatchOrSocket.

Andres Freund.  Review and stylistic cleanup by me.
parent 9ad45c18
...@@ -1042,13 +1042,6 @@ EndPrepare(GlobalTransaction gxact) ...@@ -1042,13 +1042,6 @@ EndPrepare(GlobalTransaction gxact)
/* If we crash now, we have prepared: WAL replay will fix things */ /* If we crash now, we have prepared: WAL replay will fix things */
/*
* Wake up all walsenders to send WAL up to the PREPARE record immediately
* if replication is enabled
*/
if (max_wal_senders > 0)
WalSndWakeup();
/* write correct CRC and close file */ /* write correct CRC and close file */
if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32)) if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
{ {
...@@ -2045,13 +2038,6 @@ RecordTransactionCommitPrepared(TransactionId xid, ...@@ -2045,13 +2038,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
/* Flush XLOG to disk */ /* Flush XLOG to disk */
XLogFlush(recptr); XLogFlush(recptr);
/*
* Wake up all walsenders to send WAL up to the COMMIT PREPARED record
* immediately if replication is enabled
*/
if (max_wal_senders > 0)
WalSndWakeup();
/* Mark the transaction committed in pg_clog */ /* Mark the transaction committed in pg_clog */
TransactionIdCommitTree(xid, nchildren, children); TransactionIdCommitTree(xid, nchildren, children);
...@@ -2132,13 +2118,6 @@ RecordTransactionAbortPrepared(TransactionId xid, ...@@ -2132,13 +2118,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
/* Always flush, since we're about to remove the 2PC state file */ /* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr); XLogFlush(recptr);
/*
* Wake up all walsenders to send WAL up to the ABORT PREPARED record
* immediately if replication is enabled
*/
if (max_wal_senders > 0)
WalSndWakeup();
/* /*
* Mark the transaction aborted in clog. This is not absolutely necessary * Mark the transaction aborted in clog. This is not absolutely necessary
* but we may as well do it while we are here. * but we may as well do it while we are here.
......
...@@ -1141,13 +1141,6 @@ RecordTransactionCommit(void) ...@@ -1141,13 +1141,6 @@ RecordTransactionCommit(void)
XLogFlush(XactLastRecEnd); XLogFlush(XactLastRecEnd);
/*
* Wake up all walsenders to send WAL up to the COMMIT record
* immediately if replication is enabled
*/
if (max_wal_senders > 0)
WalSndWakeup();
/* /*
* Now we may update the CLOG, if we wrote a COMMIT record above * Now we may update the CLOG, if we wrote a COMMIT record above
*/ */
......
...@@ -1025,6 +1025,8 @@ begin:; ...@@ -1025,6 +1025,8 @@ begin:;
END_CRIT_SECTION(); END_CRIT_SECTION();
/* wakeup the WalSnd now that we released the WALWriteLock */
WalSndWakeupProcessRequests();
return RecPtr; return RecPtr;
} }
...@@ -1208,6 +1210,9 @@ begin:; ...@@ -1208,6 +1210,9 @@ begin:;
END_CRIT_SECTION(); END_CRIT_SECTION();
/* wakeup the WalSnd now that we outside contented locks */
WalSndWakeupProcessRequests();
return RecPtr; return RecPtr;
} }
...@@ -1792,6 +1797,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) ...@@ -1792,6 +1797,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
if (finishing_seg || (xlog_switch && last_iteration)) if (finishing_seg || (xlog_switch && last_iteration))
{ {
issue_xlog_fsync(openLogFile, openLogSegNo); issue_xlog_fsync(openLogFile, openLogSegNo);
/* signal that we need to wakeup WalSnd later */
WalSndWakeupRequest();
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive()) if (XLogArchivingActive())
...@@ -1854,7 +1863,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) ...@@ -1854,7 +1863,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
openLogFile = XLogFileOpen(openLogSegNo); openLogFile = XLogFileOpen(openLogSegNo);
openLogOff = 0; openLogOff = 0;
} }
issue_xlog_fsync(openLogFile, openLogSegNo); issue_xlog_fsync(openLogFile, openLogSegNo);
/* signal that we need to wakeup WalSnd later */
WalSndWakeupRequest();
} }
LogwrtResult.Flush = LogwrtResult.Write; LogwrtResult.Flush = LogwrtResult.Write;
} }
...@@ -2120,6 +2133,9 @@ XLogFlush(XLogRecPtr record) ...@@ -2120,6 +2133,9 @@ XLogFlush(XLogRecPtr record)
END_CRIT_SECTION(); END_CRIT_SECTION();
/* wakeup the WalSnd now that we released the WALWriteLock */
WalSndWakeupProcessRequests();
/* /*
* If we still haven't flushed to the request point then we have a * If we still haven't flushed to the request point then we have a
* problem; most likely, the requested flush point is past end of XLOG. * problem; most likely, the requested flush point is past end of XLOG.
...@@ -2245,13 +2261,8 @@ XLogBackgroundFlush(void) ...@@ -2245,13 +2261,8 @@ XLogBackgroundFlush(void)
END_CRIT_SECTION(); END_CRIT_SECTION();
/* /* wakeup the WalSnd now that we released the WALWriteLock */
* If we wrote something then we have something to send to standbys also, WalSndWakeupProcessRequests();
* otherwise the replication delay become around 7s with just async
* commit.
*/
if (wrote_something)
WalSndWakeup();
return wrote_something; return wrote_something;
} }
......
...@@ -418,6 +418,9 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, ...@@ -418,6 +418,9 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
* NB: when calling this in a signal handler, be sure to save and restore * NB: when calling this in a signal handler, be sure to save and restore
* errno around it. (That's standard practice in most signal handlers, of * errno around it. (That's standard practice in most signal handlers, of
* course, but we used to omit it in handlers that only set a flag.) * course, but we used to omit it in handlers that only set a flag.)
*
* NB: this function is called from critical sections and signal handlers so
* throwing an error is not a good idea.
*/ */
void void
SetLatch(volatile Latch *latch) SetLatch(volatile Latch *latch)
......
...@@ -247,6 +247,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, ...@@ -247,6 +247,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
return result; return result;
} }
/*
* The comments above the unix implementation (unix_latch.c) of this function
* apply here as well.
*/
void void
SetLatch(volatile Latch *latch) SetLatch(volatile Latch *latch)
{ {
......
...@@ -81,6 +81,10 @@ bool am_cascading_walsender = false; /* Am I cascading WAL to ...@@ -81,6 +81,10 @@ bool am_cascading_walsender = false; /* Am I cascading WAL to
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int replication_timeout = 60 * 1000; /* maximum time to send one int replication_timeout = 60 * 1000; /* maximum time to send one
* WAL data message */ * WAL data message */
/*
* State for WalSndWakeupRequest
*/
bool wake_wal_senders = false;
/* /*
* These variables are used similarly to openLogFile/Id/Seg/Off, * These variables are used similarly to openLogFile/Id/Seg/Off,
...@@ -1395,7 +1399,12 @@ WalSndShmemInit(void) ...@@ -1395,7 +1399,12 @@ WalSndShmemInit(void)
} }
} }
/* Wake up all walsenders */ /*
* Wake up all walsenders
*
* This will be called inside critical sections, so throwing an error is not
* adviseable.
*/
void void
WalSndWakeup(void) WalSndWakeup(void)
{ {
......
...@@ -21,6 +21,7 @@ extern bool am_walsender; ...@@ -21,6 +21,7 @@ extern bool am_walsender;
extern bool am_cascading_walsender; extern bool am_cascading_walsender;
extern volatile sig_atomic_t walsender_shutdown_requested; extern volatile sig_atomic_t walsender_shutdown_requested;
extern volatile sig_atomic_t walsender_ready_to_stop; extern volatile sig_atomic_t walsender_ready_to_stop;
extern bool wake_wal_senders;
/* user-settable parameters */ /* user-settable parameters */
extern int max_wal_senders; extern int max_wal_senders;
...@@ -35,4 +36,27 @@ extern void WalSndRqstFileReload(void); ...@@ -35,4 +36,27 @@ extern void WalSndRqstFileReload(void);
extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS); extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
/*
* Remember that we want to wakeup walsenders later
*
* This is separated from doing the actual wakeup because the writeout is done
* while holding contended locks.
*/
#define WalSndWakeupRequest() \
do { wake_wal_senders = true; } while (0)
/*
* wakeup walsenders if there is work to be done
*/
#define WalSndWakeupProcessRequests() \
do \
{ \
if (wake_wal_senders) \
{ \
wake_wal_senders = false; \
if (max_wal_senders > 0) \
WalSndWakeup(); \
} \
} while (0)
#endif /* _WALSENDER_H */ #endif /* _WALSENDER_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment