Commit 314cbfc5 authored by Robert Haas's avatar Robert Haas

Add new replication mode synchronous_commit = 'remote_apply'.

In this mode, the master waits for the transaction to be applied on
the remote side, not just written to disk.  That means that you can
count on a transaction started on the standby to see all commits
previously acknowledged by the master.

To make this work, the standby sends a reply after replaying each
commit record generated with synchronous_commit >= 'remote_apply'.
This introduces a small inefficiency: the extra replies will be sent
even by standbys that aren't the current synchronous standby.  But
previously-existing synchronous_commit levels make no attempt at all
to optimize which replies are sent based on what the primary cares
about, so this is no worse, and at least avoids any extra replies for
people not using the feature at all.

Thomas Munro, reviewed by Michael Paquier and by me.  Some additional
tweaks by me.
parent a898b409
...@@ -2143,8 +2143,8 @@ include_dir 'conf.d' ...@@ -2143,8 +2143,8 @@ include_dir 'conf.d'
Specifies whether transaction commit will wait for WAL records Specifies whether transaction commit will wait for WAL records
to be written to disk before the command returns a <quote>success</> to be written to disk before the command returns a <quote>success</>
indication to the client. Valid values are <literal>on</>, indication to the client. Valid values are <literal>on</>,
<literal>remote_write</>, <literal>local</>, and <literal>off</>. <literal>remote_write</>, <literal>remote_apply</>, <literal>local</>,
The default, and safe, setting and <literal>off</>. The default, and safe, setting
is <literal>on</>. When <literal>off</>, there can be a delay between is <literal>on</>. When <literal>off</>, there can be a delay between
when success is reported to the client and when the transaction is when success is reported to the client and when the transaction is
really guaranteed to be safe against a server crash. (The maximum really guaranteed to be safe against a server crash. (The maximum
...@@ -2169,6 +2169,10 @@ include_dir 'conf.d' ...@@ -2169,6 +2169,10 @@ include_dir 'conf.d'
the commit record of the transaction and flushed it to disk. This the commit record of the transaction and flushed it to disk. This
ensures the transaction will not be lost unless both primary and ensures the transaction will not be lost unless both primary and
standby suffer corruption of their database storage. standby suffer corruption of their database storage.
When set to <literal>remote_apply</>, commits will wait until a reply
from the current synchronous standby indicates it has received the
commit record of the transaction and applied it, so that it has become
visible to queries.
When set to <literal>remote_write</>, commits will wait When set to <literal>remote_write</>, commits will wait
until a reply from the current synchronous standby indicates it has until a reply from the current synchronous standby indicates it has
received the commit record of the transaction and written it out to received the commit record of the transaction and written it out to
...@@ -2186,9 +2190,9 @@ include_dir 'conf.d' ...@@ -2186,9 +2190,9 @@ include_dir 'conf.d'
setting <literal>local</> is available for transactions that setting <literal>local</> is available for transactions that
wish to wait for local flush to disk, but not synchronous replication. wish to wait for local flush to disk, but not synchronous replication.
If <varname>synchronous_standby_names</> is not set, the settings If <varname>synchronous_standby_names</> is not set, the settings
<literal>on</>, <literal>remote_write</> and <literal>local</> all <literal>on</>, <literal>remote_apply</>, <literal>remote_write</>
provide the same synchronization level: transaction commits only wait and <literal>local</> all provide the same synchronization level:
for local flush to disk. transaction commits only wait for local flush to disk.
</para> </para>
<para> <para>
This parameter can be changed at any time; the behavior for any This parameter can be changed at any time; the behavior for any
......
...@@ -1081,6 +1081,9 @@ primary_slot_name = 'node_a_slot' ...@@ -1081,6 +1081,9 @@ primary_slot_name = 'node_a_slot'
WAL record is then sent to the standby. The standby sends reply WAL record is then sent to the standby. The standby sends reply
messages each time a new batch of WAL data is written to disk, unless messages each time a new batch of WAL data is written to disk, unless
<varname>wal_receiver_status_interval</> is set to zero on the standby. <varname>wal_receiver_status_interval</> is set to zero on the standby.
In the case that <varname>synchronous_commit</> is set to
<literal>remote_apply</>, the standby sends reply messages when the commit
record is replayed, making the transaction visible.
If the standby is the first matching standby, as specified in If the standby is the first matching standby, as specified in
<varname>synchronous_standby_names</> on the primary, the reply <varname>synchronous_standby_names</> on the primary, the reply
messages from that standby will be used to wake users waiting for messages from that standby will be used to wake users waiting for
...@@ -1106,6 +1109,14 @@ primary_slot_name = 'node_a_slot' ...@@ -1106,6 +1109,14 @@ primary_slot_name = 'node_a_slot'
the database of the primary gets corrupted at the same time. the database of the primary gets corrupted at the same time.
</para> </para>
<para>
Setting <varname>synchronous_commit</> to <literal>remote_apply</> will
cause each commit to wait until the current synchronous standby reports
that it has replayed the transaction, making it visible to user queries.
In simple cases, this allows for load balancing with causal consistency
on a single hot standby.
</para>
<para> <para>
Users will stop waiting if a fast shutdown is requested. However, as Users will stop waiting if a fast shutdown is requested. However, as
when using asynchronous replication, the server will not fully when using asynchronous replication, the server will not fully
...@@ -1160,9 +1171,10 @@ primary_slot_name = 'node_a_slot' ...@@ -1160,9 +1171,10 @@ primary_slot_name = 'node_a_slot'
<title>Planning for High Availability</title> <title>Planning for High Availability</title>
<para> <para>
Commits made when <varname>synchronous_commit</> is set to <literal>on</> Commits made when <varname>synchronous_commit</> is set to <literal>on</>,
or <literal>remote_write</> will wait until the synchronous standby responds. The response <literal>remote_apply</> or <literal>remote_write</> will wait until the
may never occur if the last, or only, standby should crash. synchronous standby responds. The response may never occur if the last, or
only, standby should crash.
</para> </para>
<para> <para>
......
...@@ -1107,7 +1107,7 @@ EndPrepare(GlobalTransaction gxact) ...@@ -1107,7 +1107,7 @@ EndPrepare(GlobalTransaction gxact)
* Note that at this stage we have marked the prepare, but still show as * Note that at this stage we have marked the prepare, but still show as
* running in the procarray (twice!) and continue to hold locks. * running in the procarray (twice!) and continue to hold locks.
*/ */
SyncRepWaitForLSN(gxact->prepare_end_lsn); SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
records.tail = records.head = NULL; records.tail = records.head = NULL;
records.num_chunks = 0; records.num_chunks = 0;
...@@ -2103,7 +2103,7 @@ RecordTransactionCommitPrepared(TransactionId xid, ...@@ -2103,7 +2103,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
* Note that at this stage we have marked clog, but still show as running * Note that at this stage we have marked clog, but still show as running
* in the procarray and continue to hold locks. * in the procarray and continue to hold locks.
*/ */
SyncRepWaitForLSN(recptr); SyncRepWaitForLSN(recptr, true);
} }
/* /*
...@@ -2156,5 +2156,5 @@ RecordTransactionAbortPrepared(TransactionId xid, ...@@ -2156,5 +2156,5 @@ RecordTransactionAbortPrepared(TransactionId xid,
* Note that at this stage we have marked clog, but still show as running * Note that at this stage we have marked clog, but still show as running
* in the procarray and continue to hold locks. * in the procarray and continue to hold locks.
*/ */
SyncRepWaitForLSN(recptr); SyncRepWaitForLSN(recptr, false);
} }
...@@ -1324,7 +1324,7 @@ RecordTransactionCommit(void) ...@@ -1324,7 +1324,7 @@ RecordTransactionCommit(void)
* in the procarray and continue to hold locks. * in the procarray and continue to hold locks.
*/ */
if (wrote_xlog && markXidCommitted) if (wrote_xlog && markXidCommitted)
SyncRepWaitForLSN(XactLastRecEnd); SyncRepWaitForLSN(XactLastRecEnd, true);
/* remember end of last commit record */ /* remember end of last commit record */
XactLastCommitEnd = XactLastRecEnd; XactLastCommitEnd = XactLastRecEnd;
...@@ -5122,6 +5122,13 @@ XactLogCommitRecord(TimestampTz commit_time, ...@@ -5122,6 +5122,13 @@ XactLogCommitRecord(TimestampTz commit_time,
if (forceSyncCommit) if (forceSyncCommit)
xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
/*
* Check if the caller would like to ask standbys for immediate feedback
* once this commit is applied.
*/
if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY)
xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK;
/* /*
* Relcache invalidations requires information about the current database * Relcache invalidations requires information about the current database
* and so does logical decoding. * and so does logical decoding.
...@@ -5459,6 +5466,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, ...@@ -5459,6 +5466,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
if (XactCompletionForceSyncCommit(parsed->xinfo)) if (XactCompletionForceSyncCommit(parsed->xinfo))
XLogFlush(lsn); XLogFlush(lsn);
/*
* If asked by the primary (because someone is waiting for a synchronous
* commit = remote_apply), we will need to ask walreceiver to send a
* reply immediately.
*/
if (XactCompletionApplyFeedback(parsed->xinfo))
XLogRequestWalReceiverReply();
} }
/* /*
......
...@@ -345,6 +345,9 @@ static XLogRecPtr RedoRecPtr; ...@@ -345,6 +345,9 @@ static XLogRecPtr RedoRecPtr;
*/ */
static bool doPageWrites; static bool doPageWrites;
/* Has the recovery code requested a walreceiver wakeup? */
static bool doRequestWalReceiverReply;
/* /*
* RedoStartLSN points to the checkpoint's REDO location which is specified * RedoStartLSN points to the checkpoint's REDO location which is specified
* in a backup label file, backup history file or control file. In standby * in a backup label file, backup history file or control file. In standby
...@@ -6879,6 +6882,17 @@ StartupXLOG(void) ...@@ -6879,6 +6882,17 @@ StartupXLOG(void)
XLogCtl->lastReplayedTLI = ThisTimeLineID; XLogCtl->lastReplayedTLI = ThisTimeLineID;
SpinLockRelease(&XLogCtl->info_lck); SpinLockRelease(&XLogCtl->info_lck);
/*
* If rm_redo called XLogRequestWalReceiverReply, then we
* wake up the receiver so that it notices the updated
* lastReplayedEndRecPtr and sends a reply to the master.
*/
if (doRequestWalReceiverReply)
{
doRequestWalReceiverReply = false;
WalRcvForceReply();
}
/* Remember this record as the last-applied one */ /* Remember this record as the last-applied one */
LastRec = ReadRecPtr; LastRec = ReadRecPtr;
...@@ -11594,3 +11608,12 @@ SetWalWriterSleeping(bool sleeping) ...@@ -11594,3 +11608,12 @@ SetWalWriterSleeping(bool sleeping)
XLogCtl->WalWriterSleeping = sleeping; XLogCtl->WalWriterSleeping = sleeping;
SpinLockRelease(&XLogCtl->info_lck); SpinLockRelease(&XLogCtl->info_lck);
} }
/*
* Schedule a walreceiver wakeup in the main recovery loop.
*/
void
XLogRequestWalReceiverReply(void)
{
doRequestWalReceiverReply = true;
}
...@@ -16,14 +16,16 @@ bool walrcv_connect(char *conninfo, XLogRecPtr startpoint) ...@@ -16,14 +16,16 @@ bool walrcv_connect(char *conninfo, XLogRecPtr startpoint)
Establish connection to the primary, and starts streaming from 'startpoint'. Establish connection to the primary, and starts streaming from 'startpoint'.
Returns true on success. Returns true on success.
bool walrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) int walrcv_receive(char **buffer, int *wait_fd)
Retrieve any message available through the connection, blocking for Retrieve any message available without blocking through the
maximum of 'timeout' ms. If a message was successfully read, returns true, connection. If a message was successfully read, returns its
otherwise false. On success, a pointer to the message payload is stored in length. If the connection is closed, returns -1. Otherwise returns 0
*buffer, length in *len, and the type of message received in *type. The to indicate that no data is available, and sets *wait_fd to a file
returned buffer is valid until the next call to walrcv_* functions, the descriptor which can be waited on before trying again. On success, a
caller should not attempt freeing it. pointer to the message payload is stored in *buffer. The returned
buffer is valid until the next call to walrcv_* functions, and the
caller should not attempt to free it.
void walrcv_send(const char *buffer, int nbytes) void walrcv_send(const char *buffer, int nbytes)
......
...@@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch ...@@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
char *slotname); char *slotname);
static void libpqrcv_endstreaming(TimeLineID *next_tli); static void libpqrcv_endstreaming(TimeLineID *next_tli);
static int libpqrcv_receive(int timeout, char **buffer); static int libpqrcv_receive(char **buffer, int *wait_fd);
static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void); static void libpqrcv_disconnect(void);
...@@ -463,8 +463,7 @@ libpqrcv_disconnect(void) ...@@ -463,8 +463,7 @@ libpqrcv_disconnect(void)
} }
/* /*
* Receive a message available from XLOG stream, blocking for * Receive a message available from XLOG stream.
* maximum of 'timeout' ms.
* *
* Returns: * Returns:
* *
...@@ -472,15 +471,15 @@ libpqrcv_disconnect(void) ...@@ -472,15 +471,15 @@ libpqrcv_disconnect(void)
* point to a buffer holding the received message. The buffer is only valid * point to a buffer holding the received message. The buffer is only valid
* until the next libpqrcv_* call. * until the next libpqrcv_* call.
* *
* 0 if no data was available within timeout, or wait was interrupted * If no data was available immediately, returns 0, and *wait_fd is set to a
* by signal. * file descriptor which can be waited on before trying again.
* *
* -1 if the server ended the COPY. * -1 if the server ended the COPY.
* *
* ereports on error. * ereports on error.
*/ */
static int static int
libpqrcv_receive(int timeout, char **buffer) libpqrcv_receive(char **buffer, int *wait_fd)
{ {
int rawlen; int rawlen;
...@@ -492,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer) ...@@ -492,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer)
rawlen = PQgetCopyData(streamConn, &recvBuf, 1); rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0) if (rawlen == 0)
{ {
/* /* Try consuming some data. */
* No data available yet. If the caller requested to block, wait for
* more data to arrive.
*/
if (timeout > 0)
{
if (!libpq_select(timeout))
return 0;
}
if (PQconsumeInput(streamConn) == 0) if (PQconsumeInput(streamConn) == 0)
ereport(ERROR, ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s", (errmsg("could not receive data from WAL stream: %s",
...@@ -510,8 +500,12 @@ libpqrcv_receive(int timeout, char **buffer) ...@@ -510,8 +500,12 @@ libpqrcv_receive(int timeout, char **buffer)
/* Now that we've consumed some input, try again */ /* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1); rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0) if (rawlen == 0)
{
/* Tell caller to try again when our socket is ready. */
*wait_fd = PQsocket(streamConn);
return 0; return 0;
} }
}
if (rawlen == -1) /* end-of-streaming or error */ if (rawlen == -1) /* end-of-streaming or error */
{ {
PGresult *res; PGresult *res;
......
...@@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); ...@@ -91,13 +91,24 @@ static bool SyncRepQueueIsOrderedByLSN(int mode);
* to the wait queue. During SyncRepWakeQueue() a WALSender changes * to the wait queue. During SyncRepWakeQueue() a WALSender changes
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed. * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
* This backend then resets its state to SYNC_REP_NOT_WAITING. * This backend then resets its state to SYNC_REP_NOT_WAITING.
*
* 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
* represents a commit record. If it doesn't, then we wait only for the WAL
* to be flushed if synchronous_commit is set to the higher level of
* remote_apply, because only commit records provide apply feedback.
*/ */
void void
SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
{ {
char *new_status = NULL; char *new_status = NULL;
const char *old_status; const char *old_status;
int mode = SyncRepWaitMode; int mode;
/* Cap the level for anything other than commit to remote flush only. */
if (commit)
mode = SyncRepWaitMode;
else
mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
/* /*
* Fast exit if user has not requested sync replication, or there are no * Fast exit if user has not requested sync replication, or there are no
...@@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ...@@ -122,7 +133,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* to be a low cost check. * to be a low cost check.
*/ */
if (!WalSndCtl->sync_standbys_defined || if (!WalSndCtl->sync_standbys_defined ||
XactCommitLSN <= WalSndCtl->lsn[mode]) lsn <= WalSndCtl->lsn[mode])
{ {
LWLockRelease(SyncRepLock); LWLockRelease(SyncRepLock);
return; return;
...@@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ...@@ -132,7 +143,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
* Set our waitLSN so WALSender will know when to wake us, and add * Set our waitLSN so WALSender will know when to wake us, and add
* ourselves to the queue. * ourselves to the queue.
*/ */
MyProc->waitLSN = XactCommitLSN; MyProc->waitLSN = lsn;
MyProc->syncRepState = SYNC_REP_WAITING; MyProc->syncRepState = SYNC_REP_WAITING;
SyncRepQueueInsert(mode); SyncRepQueueInsert(mode);
Assert(SyncRepQueueIsOrderedByLSN(mode)); Assert(SyncRepQueueIsOrderedByLSN(mode));
...@@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ...@@ -147,7 +158,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
new_status = (char *) palloc(len + 32 + 1); new_status = (char *) palloc(len + 32 + 1);
memcpy(new_status, old_status, len); memcpy(new_status, old_status, len);
sprintf(new_status + len, " waiting for %X/%X", sprintf(new_status + len, " waiting for %X/%X",
(uint32) (XactCommitLSN >> 32), (uint32) XactCommitLSN); (uint32) (lsn >> 32), (uint32) lsn);
set_ps_display(new_status, false); set_ps_display(new_status, false);
new_status[len] = '\0'; /* truncate off " waiting ..." */ new_status[len] = '\0'; /* truncate off " waiting ..." */
} }
...@@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void) ...@@ -416,6 +427,7 @@ SyncRepReleaseWaiters(void)
WalSnd *syncWalSnd; WalSnd *syncWalSnd;
int numwrite = 0; int numwrite = 0;
int numflush = 0; int numflush = 0;
int numapply = 0;
/* /*
* If this WALSender is serving a standby that is not on the list of * If this WALSender is serving a standby that is not on the list of
...@@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void) ...@@ -462,12 +474,18 @@ SyncRepReleaseWaiters(void)
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush;
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
} }
if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < MyWalSnd->apply)
{
walsndctl->lsn[SYNC_REP_WAIT_APPLY] = MyWalSnd->apply;
numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
}
LWLockRelease(SyncRepLock); LWLockRelease(SyncRepLock);
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%x",
numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write,
numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush,
numapply, (uint32) (MyWalSnd->apply >> 32), (uint32) MyWalSnd->apply);
/* /*
* If we are managing the highest priority standby, though we weren't * If we are managing the highest priority standby, though we weren't
...@@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra) ...@@ -728,6 +746,9 @@ assign_synchronous_commit(int newval, void *extra)
case SYNCHRONOUS_COMMIT_REMOTE_FLUSH: case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
SyncRepWaitMode = SYNC_REP_WAIT_FLUSH; SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
break; break;
case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
break;
default: default:
SyncRepWaitMode = SYNC_REP_NO_WAIT; SyncRepWaitMode = SYNC_REP_NO_WAIT;
break; break;
......
...@@ -352,8 +352,6 @@ WalReceiverMain(void) ...@@ -352,8 +352,6 @@ WalReceiverMain(void)
if (walrcv_startstreaming(startpointTLI, startpoint, if (walrcv_startstreaming(startpointTLI, startpoint,
slotname[0] != '\0' ? slotname : NULL)) slotname[0] != '\0' ? slotname : NULL))
{ {
bool endofwal = false;
if (first_stream) if (first_stream)
ereport(LOG, ereport(LOG,
(errmsg("started streaming WAL from primary at %X/%X on timeline %u", (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
...@@ -376,18 +374,13 @@ WalReceiverMain(void) ...@@ -376,18 +374,13 @@ WalReceiverMain(void)
ping_sent = false; ping_sent = false;
/* Loop until end-of-streaming or error */ /* Loop until end-of-streaming or error */
while (!endofwal) for (;;)
{ {
char *buf; char *buf;
int len; int len;
bool endofwal = false;
/* int wait_fd = PGINVALID_SOCKET;
* Emergency bailout if postmaster has died. This is to avoid int rc;
* the necessity for manual cleanup of all postmaster
* children.
*/
if (!PostmasterIsAlive())
exit(1);
/* /*
* Exit walreceiver if we're not in recovery. This should not * Exit walreceiver if we're not in recovery. This should not
...@@ -407,8 +400,8 @@ WalReceiverMain(void) ...@@ -407,8 +400,8 @@ WalReceiverMain(void)
XLogWalRcvSendHSFeedback(true); XLogWalRcvSendHSFeedback(true);
} }
/* Wait a while for data to arrive */ /* See if we can read data immediately */
len = walrcv_receive(NAPTIME_PER_CYCLE, &buf); len = walrcv_receive(&buf, &wait_fd);
if (len != 0) if (len != 0)
{ {
/* /*
...@@ -439,7 +432,7 @@ WalReceiverMain(void) ...@@ -439,7 +432,7 @@ WalReceiverMain(void)
endofwal = true; endofwal = true;
break; break;
} }
len = walrcv_receive(0, &buf); len = walrcv_receive(&buf, &wait_fd);
} }
/* Let the master know that we received some data. */ /* Let the master know that we received some data. */
...@@ -452,7 +445,54 @@ WalReceiverMain(void) ...@@ -452,7 +445,54 @@ WalReceiverMain(void)
*/ */
XLogWalRcvFlush(false); XLogWalRcvFlush(false);
} }
else
/* Check if we need to exit the streaming loop. */
if (endofwal)
break;
/*
* Ideally we would reuse a WaitEventSet object repeatedly
* here to avoid the overheads of WaitLatchOrSocket on epoll
* systems, but we can't be sure that libpq (or any other
* walreceiver implementation) has the same socket (even if
* the fd is the same number, it may have been closed and
* reopened since the last time). In future, if there is a
* function for removing sockets from WaitEventSet, then we
* could add and remove just the socket each time, potentially
* avoiding some system calls.
*/
Assert(wait_fd != PGINVALID_SOCKET);
rc = WaitLatchOrSocket(&walrcv->latch,
WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
WL_TIMEOUT | WL_LATCH_SET,
wait_fd,
NAPTIME_PER_CYCLE);
if (rc & WL_LATCH_SET)
{
ResetLatch(&walrcv->latch);
if (walrcv->force_reply)
{
/*
* The recovery process has asked us to send apply
* feedback now. Make sure the flag is really set to
* false in shared memory before sending the reply,
* so we don't miss a new request for a reply.
*/
walrcv->force_reply = false;
pg_memory_barrier();
XLogWalRcvSendReply(true, false);
}
}
if (rc & WL_POSTMASTER_DEATH)
{
/*
* Emergency bailout if postmaster has died. This is to
* avoid the necessity for manual cleanup of all
* postmaster children.
*/
exit(1);
}
if (rc & WL_TIMEOUT)
{ {
/* /*
* We didn't receive anything new. If we haven't heard * We didn't receive anything new. If we haven't heard
...@@ -1221,6 +1261,21 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) ...@@ -1221,6 +1261,21 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
} }
} }
/*
* Wake up the walreceiver main loop.
*
* This is called by the startup process whenever interesting xlog records
* are applied, so that walreceiver can check if it needs to send an apply
* notification back to the master which may be waiting in a COMMIT with
* synchronous_commit = remote_apply.
*/
void
WalRcvForceReply(void)
{
WalRcv->force_reply = true;
SetLatch(&WalRcv->latch);
}
/* /*
* Return a string constant representing the state. This is used * Return a string constant representing the state. This is used
* in system functions and views, and should *not* be translated. * in system functions and views, and should *not* be translated.
......
...@@ -345,12 +345,13 @@ static const struct config_enum_entry constraint_exclusion_options[] = { ...@@ -345,12 +345,13 @@ static const struct config_enum_entry constraint_exclusion_options[] = {
}; };
/* /*
* Although only "on", "off", "remote_write", and "local" are documented, we * Although only "on", "off", "remote_apply", "remote_write", and "local" are
* accept all the likely variants of "on" and "off". * documented, we accept all the likely variants of "on" and "off".
*/ */
static const struct config_enum_entry synchronous_commit_options[] = { static const struct config_enum_entry synchronous_commit_options[] = {
{"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false}, {"local", SYNCHRONOUS_COMMIT_LOCAL_FLUSH, false},
{"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false}, {"remote_write", SYNCHRONOUS_COMMIT_REMOTE_WRITE, false},
{"remote_apply", SYNCHRONOUS_COMMIT_REMOTE_APPLY, false},
{"on", SYNCHRONOUS_COMMIT_ON, false}, {"on", SYNCHRONOUS_COMMIT_ON, false},
{"off", SYNCHRONOUS_COMMIT_OFF, false}, {"off", SYNCHRONOUS_COMMIT_OFF, false},
{"true", SYNCHRONOUS_COMMIT_ON, true}, {"true", SYNCHRONOUS_COMMIT_ON, true},
......
...@@ -177,7 +177,7 @@ ...@@ -177,7 +177,7 @@
# (change requires restart) # (change requires restart)
#fsync = on # turns forced synchronization on or off #fsync = on # turns forced synchronization on or off
#synchronous_commit = on # synchronization level; #synchronous_commit = on # synchronization level;
# off, local, remote_write, or on # off, local, remote_write, remote_apply, or on
#wal_sync_method = fsync # the default is the first option #wal_sync_method = fsync # the default is the first option
# supported by the operating system: # supported by the operating system:
# open_datasync # open_datasync
......
...@@ -60,7 +60,9 @@ typedef enum ...@@ -60,7 +60,9 @@ typedef enum
SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */ SYNCHRONOUS_COMMIT_LOCAL_FLUSH, /* wait for local flush only */
SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote SYNCHRONOUS_COMMIT_REMOTE_WRITE, /* wait for local flush and remote
* write */ * write */
SYNCHRONOUS_COMMIT_REMOTE_FLUSH /* wait for local and remote flush */ SYNCHRONOUS_COMMIT_REMOTE_FLUSH, /* wait for local and remote flush */
SYNCHRONOUS_COMMIT_REMOTE_APPLY /* wait for local flush and remote
* apply */
} SyncCommitLevel; } SyncCommitLevel;
/* Define the default setting for synchonous_commit */ /* Define the default setting for synchonous_commit */
...@@ -144,10 +146,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, ...@@ -144,10 +146,13 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
* EOXact... routines which run at the end of the original transaction * EOXact... routines which run at the end of the original transaction
* completion. * completion.
*/ */
#define XACT_COMPLETION_APPLY_FEEDBACK (1U << 29)
#define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30) #define XACT_COMPLETION_UPDATE_RELCACHE_FILE (1U << 30)
#define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31) #define XACT_COMPLETION_FORCE_SYNC_COMMIT (1U << 31)
/* Access macros for above flags */ /* Access macros for above flags */
#define XactCompletionApplyFeedback(xinfo) \
((xinfo & XACT_COMPLETION_APPLY_FEEDBACK) != 0)
#define XactCompletionRelcacheInitFileInval(xinfo) \ #define XactCompletionRelcacheInitFileInval(xinfo) \
((xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) != 0) ((xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE) != 0)
#define XactCompletionForceSyncCommit(xinfo) \ #define XactCompletionForceSyncCommit(xinfo) \
......
...@@ -267,6 +267,8 @@ extern bool CheckPromoteSignal(void); ...@@ -267,6 +267,8 @@ extern bool CheckPromoteSignal(void);
extern void WakeupRecovery(void); extern void WakeupRecovery(void);
extern void SetWalWriterSleeping(bool sleeping); extern void SetWalWriterSleeping(bool sleeping);
extern void XLogRequestWalReceiverReply(void);
extern void assign_max_wal_size(int newval, void *extra); extern void assign_max_wal_size(int newval, void *extra);
extern void assign_checkpoint_completion_target(double newval, void *extra); extern void assign_checkpoint_completion_target(double newval, void *extra);
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
/* /*
* Each page of XLOG file has a header like this: * Each page of XLOG file has a header like this:
*/ */
#define XLOG_PAGE_MAGIC 0xD088 /* can be used as WAL version indicator */ #define XLOG_PAGE_MAGIC 0xD089 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData typedef struct XLogPageHeaderData
{ {
......
...@@ -23,8 +23,9 @@ ...@@ -23,8 +23,9 @@
#define SYNC_REP_NO_WAIT -1 #define SYNC_REP_NO_WAIT -1
#define SYNC_REP_WAIT_WRITE 0 #define SYNC_REP_WAIT_WRITE 0
#define SYNC_REP_WAIT_FLUSH 1 #define SYNC_REP_WAIT_FLUSH 1
#define SYNC_REP_WAIT_APPLY 2
#define NUM_SYNC_REP_WAIT_MODE 2 #define NUM_SYNC_REP_WAIT_MODE 3
/* syncRepState */ /* syncRepState */
#define SYNC_REP_NOT_WAITING 0 #define SYNC_REP_NOT_WAITING 0
...@@ -35,7 +36,7 @@ ...@@ -35,7 +36,7 @@
extern char *SyncRepStandbyNames; extern char *SyncRepStandbyNames;
/* called by user backend */ /* called by user backend */
extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit);
/* called at backend exit */ /* called at backend exit */
extern void SyncRepCleanupAtProcExit(void); extern void SyncRepCleanupAtProcExit(void);
......
...@@ -112,10 +112,17 @@ typedef struct ...@@ -112,10 +112,17 @@ typedef struct
slock_t mutex; /* locks shared variables shown above */ slock_t mutex; /* locks shared variables shown above */
/*
* force walreceiver reply? This doesn't need to be locked; memory
* barriers for ordering are sufficient.
*/
bool force_reply;
/* /*
* Latch used by startup process to wake up walreceiver after telling it * Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and * where to start streaming (after setting receiveStart and
* receiveStartTLI). * receiveStartTLI), and also to tell it to send apply feedback to the
* primary whenever specially marked commit records are applied.
*/ */
Latch latch; Latch latch;
} WalRcvData; } WalRcvData;
...@@ -138,7 +145,7 @@ extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming; ...@@ -138,7 +145,7 @@ extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli); typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming; extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
typedef int (*walrcv_receive_type) (int timeout, char **buffer); typedef int (*walrcv_receive_type) (char **buffer, int *wait_fd);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive; extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_send_type) (const char *buffer, int nbytes); typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
...@@ -162,5 +169,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, ...@@ -162,5 +169,6 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern int GetReplicationApplyDelay(void); extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void); extern int GetReplicationTransferLatency(void);
extern void WalRcvForceReply(void);
#endif /* _WALRECEIVER_H */ #endif /* _WALRECEIVER_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment