Commit 64233902 authored by Simon Riggs's avatar Simon Riggs

Send new protocol keepalive messages to standby servers.

Allows streaming replication users to calculate transfer latency
and apply delay via internal functions. No external functions yet.
parent 2ae2e9c0
...@@ -1463,6 +1463,54 @@ The commands accepted in walsender mode are: ...@@ -1463,6 +1463,54 @@ The commands accepted in walsender mode are:
CopyData message): CopyData message):
</para> </para>
<para>
<variablelist>
<varlistentry>
<term>
Primary keepalive message (B)
</term>
<listitem>
<para>
<variablelist>
<varlistentry>
<term>
Byte1('k')
</term>
<listitem>
<para>
Identifies the message as a sender keepalive.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Byte8
</term>
<listitem>
<para>
The current end of WAL on the server, given in
XLogRecPtr format.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Byte8
</term>
<listitem>
<para>
The server's system clock at the time of transmission,
given in TimestampTz format.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para> <para>
<variablelist> <variablelist>
<varlistentry> <varlistentry>
......
...@@ -452,6 +452,9 @@ typedef struct XLogCtlData ...@@ -452,6 +452,9 @@ typedef struct XLogCtlData
XLogRecPtr recoveryLastRecPtr; XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime; TimestampTz recoveryLastXTime;
/* timestamp of when we started replaying the current chunk of WAL data,
* only relevant for replication or archive recovery */
TimestampTz currentChunkStartTime;
/* end of the last record restored from the archive */ /* end of the last record restored from the archive */
XLogRecPtr restoreLastRecPtr; XLogRecPtr restoreLastRecPtr;
/* Are we requested to pause recovery? */ /* Are we requested to pause recovery? */
...@@ -606,6 +609,7 @@ static void exitArchiveRecovery(TimeLineID endTLI, ...@@ -606,6 +609,7 @@ static void exitArchiveRecovery(TimeLineID endTLI,
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis); static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
static void recoveryPausesHere(void); static void recoveryPausesHere(void);
static void SetLatestXTime(TimestampTz xtime); static void SetLatestXTime(TimestampTz xtime);
static void SetCurrentChunkStartTime(TimestampTz xtime);
static void CheckRequiredParameterValues(void); static void CheckRequiredParameterValues(void);
static void XLogReportParameters(void); static void XLogReportParameters(void);
static void LocalSetXLogInsertAllowed(void); static void LocalSetXLogInsertAllowed(void);
...@@ -5847,6 +5851,41 @@ GetLatestXTime(void) ...@@ -5847,6 +5851,41 @@ GetLatestXTime(void)
return xtime; return xtime;
} }
/*
* Save timestamp of the next chunk of WAL records to apply.
*
* We keep this in XLogCtl, not a simple static variable, so that it can be
* seen by all backends.
*/
static void
SetCurrentChunkStartTime(TimestampTz xtime)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->currentChunkStartTime = xtime;
SpinLockRelease(&xlogctl->info_lck);
}
/*
* Fetch timestamp of latest processed commit/abort record.
* Startup process maintains an accurate local copy in XLogReceiptTime
*/
TimestampTz
GetCurrentChunkReplayStartTime(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
TimestampTz xtime;
SpinLockAcquire(&xlogctl->info_lck);
xtime = xlogctl->currentChunkStartTime;
SpinLockRelease(&xlogctl->info_lck);
return xtime;
}
/* /*
* Returns time of receipt of current chunk of XLOG data, as well as * Returns time of receipt of current chunk of XLOG data, as well as
* whether it was received from streaming replication or from archives. * whether it was received from streaming replication or from archives.
...@@ -6390,6 +6429,7 @@ StartupXLOG(void) ...@@ -6390,6 +6429,7 @@ StartupXLOG(void)
xlogctl->replayEndRecPtr = ReadRecPtr; xlogctl->replayEndRecPtr = ReadRecPtr;
xlogctl->recoveryLastRecPtr = ReadRecPtr; xlogctl->recoveryLastRecPtr = ReadRecPtr;
xlogctl->recoveryLastXTime = 0; xlogctl->recoveryLastXTime = 0;
xlogctl->currentChunkStartTime = 0;
xlogctl->recoveryPause = false; xlogctl->recoveryPause = false;
SpinLockRelease(&xlogctl->info_lck); SpinLockRelease(&xlogctl->info_lck);
...@@ -9696,7 +9736,10 @@ retry: ...@@ -9696,7 +9736,10 @@ retry:
{ {
havedata = true; havedata = true;
if (!XLByteLT(*RecPtr, latestChunkStart)) if (!XLByteLT(*RecPtr, latestChunkStart))
{
XLogReceiptTime = GetCurrentTimestamp(); XLogReceiptTime = GetCurrentTimestamp();
SetCurrentChunkStartTime(XLogReceiptTime);
}
} }
else else
havedata = false; havedata = false;
......
...@@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); ...@@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying); static void XLogWalRcvFlush(bool dying);
static void XLogWalRcvSendReply(void); static void XLogWalRcvSendReply(void);
static void XLogWalRcvSendHSFeedback(void); static void XLogWalRcvSendHSFeedback(void);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
/* Signal handlers */ /* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS); static void WalRcvSigHupHandler(SIGNAL_ARGS);
...@@ -218,6 +219,10 @@ WalReceiverMain(void) ...@@ -218,6 +219,10 @@ WalReceiverMain(void)
/* Fetch information required to start streaming */ /* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receiveStart; startpoint = walrcv->receiveStart;
/* Initialise to a sanish value */
walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp();
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
/* Arrange to clean up at walreceiver exit */ /* Arrange to clean up at walreceiver exit */
...@@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) ...@@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
errmsg_internal("invalid WAL message received from primary"))); errmsg_internal("invalid WAL message received from primary")));
/* memcpy is required here for alignment reasons */ /* memcpy is required here for alignment reasons */
memcpy(&msghdr, buf, sizeof(WalDataMessageHeader)); memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
buf += sizeof(WalDataMessageHeader); buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader); len -= sizeof(WalDataMessageHeader);
XLogWalRcvWrite(buf, len, msghdr.dataStart); XLogWalRcvWrite(buf, len, msghdr.dataStart);
break; break;
} }
case 'k': /* Keepalive */
{
PrimaryKeepaliveMessage keepalive;
if (len != sizeof(PrimaryKeepaliveMessage))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid keepalive message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
break;
}
default: default:
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION), (errcode(ERRCODE_PROTOCOL_VIOLATION),
...@@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void) ...@@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void)
memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage)); memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1); walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
} }
/*
* Keep track of important messages from primary.
*/
static void
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
walrcv->lastMsgSendTime = sendTime;
walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);
elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d transfer latency %d",
timestamptz_to_str(sendTime),
timestamptz_to_str(lastMsgReceiptTime),
GetReplicationApplyDelay(),
GetReplicationTransferLatency());
}
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "replication/walreceiver.h" #include "replication/walreceiver.h"
#include "storage/pmsignal.h" #include "storage/pmsignal.h"
#include "storage/shmem.h" #include "storage/shmem.h"
#include "utils/timestamp.h"
WalRcvData *WalRcv = NULL; WalRcvData *WalRcv = NULL;
...@@ -238,3 +239,65 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart) ...@@ -238,3 +239,65 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
return recptr; return recptr;
} }
/*
* Returns the replication apply delay in ms
*/
int
GetReplicationApplyDelay(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
XLogRecPtr receivePtr;
XLogRecPtr replayPtr;
long secs;
int usecs;
SpinLockAcquire(&walrcv->mutex);
receivePtr = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
replayPtr = GetXLogReplayRecPtr(NULL);
if (XLByteLE(receivePtr, replayPtr))
return 0;
TimestampDifference(GetCurrentChunkReplayStartTime(),
GetCurrentTimestamp(),
&secs, &usecs);
return (((int) secs * 1000) + (usecs / 1000));
}
/*
* Returns the network latency in ms, note that this includes any
* difference in clock settings between the servers, as well as timezone.
*/
int
GetReplicationTransferLatency(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
TimestampTz lastMsgSendTime;
TimestampTz lastMsgReceiptTime;
long secs = 0;
int usecs = 0;
int ms;
SpinLockAcquire(&walrcv->mutex);
lastMsgSendTime = walrcv->lastMsgSendTime;
lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);
TimestampDifference(lastMsgSendTime,
lastMsgReceiptTime,
&secs, &usecs);
ms = ((int) secs * 1000) + (usecs / 1000);
return ms;
}
...@@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void); ...@@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void); static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void); static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void); static void ProcessRepliesIfAny(void);
static void WalSndKeepalive(char *msgbuf);
/* Main entry point for walsender process */ /* Main entry point for walsender process */
...@@ -823,30 +824,24 @@ WalSndLoop(void) ...@@ -823,30 +824,24 @@ WalSndLoop(void)
*/ */
if (caughtup || pq_is_send_pending()) if (caughtup || pq_is_send_pending())
{ {
TimestampTz finish_time = 0; TimestampTz timeout = 0;
long sleeptime = -1; long sleeptime = 10000; /* 10 s */
int wakeEvents; int wakeEvents;
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
WL_SOCKET_READABLE; WL_SOCKET_READABLE | WL_TIMEOUT;
if (pq_is_send_pending()) if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE; wakeEvents |= WL_SOCKET_WRITEABLE;
else
WalSndKeepalive(output_message);
/* Determine time until replication timeout */ /* Determine time until replication timeout */
if (replication_timeout > 0) if (replication_timeout > 0)
{ {
long secs; timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
int usecs;
finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
replication_timeout); replication_timeout);
TimestampDifference(GetCurrentTimestamp(), sleeptime = 1 + (replication_timeout / 10);
finish_time, &secs, &usecs);
sleeptime = secs * 1000 + usecs / 1000;
/* Avoid Assert in WaitLatchOrSocket if timeout is past */
if (sleeptime < 0)
sleeptime = 0;
wakeEvents |= WL_TIMEOUT;
} }
/* Sleep until something happens or replication timeout */ /* Sleep until something happens or replication timeout */
...@@ -859,7 +854,7 @@ WalSndLoop(void) ...@@ -859,7 +854,7 @@ WalSndLoop(void)
* timeout ... he's supposed to reply *before* that. * timeout ... he's supposed to reply *before* that.
*/ */
if (replication_timeout > 0 && if (replication_timeout > 0 &&
GetCurrentTimestamp() >= finish_time) GetCurrentTimestamp() >= timeout)
{ {
/* /*
* Since typically expiration of replication timeout means * Since typically expiration of replication timeout means
...@@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
return (Datum) 0; return (Datum) 0;
} }
static void
WalSndKeepalive(char *msgbuf)
{
PrimaryKeepaliveMessage keepalive_message;
/* Construct a new message */
keepalive_message.walEnd = sentPtr;
keepalive_message.sendTime = GetCurrentTimestamp();
elog(DEBUG2, "sending replication keepalive");
/* Prepend with the message type and send it. */
msgbuf[0] = 'k';
memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
}
/* /*
* This isn't currently used for anything. Monitoring tools might be * This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the * interested in the future, and we'll need something like this in the
......
...@@ -293,6 +293,7 @@ extern XLogRecPtr GetXLogWriteRecPtr(void); ...@@ -293,6 +293,7 @@ extern XLogRecPtr GetXLogWriteRecPtr(void);
extern bool RecoveryIsPaused(void); extern bool RecoveryIsPaused(void);
extern void SetRecoveryPause(bool recoveryPause); extern void SetRecoveryPause(bool recoveryPause);
extern TimestampTz GetLatestXTime(void); extern TimestampTz GetLatestXTime(void);
extern TimestampTz GetCurrentChunkReplayStartTime(void);
extern void UpdateControlFile(void); extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void); extern uint64 GetSystemIdentifier(void);
......
...@@ -16,6 +16,20 @@ ...@@ -16,6 +16,20 @@
#include "datatype/timestamp.h" #include "datatype/timestamp.h"
/*
* All messages from WalSender must contain these fields to allow us to
* correctly calculate the replication delay.
*/
typedef struct
{
/* Current end of WAL on the sender */
XLogRecPtr walEnd;
/* Sender's system clock at the time of transmission */
TimestampTz sendTime;
} WalSndrMessage;
/* /*
* Header for a WAL data message (message type 'w'). This is wrapped within * Header for a WAL data message (message type 'w'). This is wrapped within
* a CopyData message at the FE/BE protocol level. * a CopyData message at the FE/BE protocol level.
...@@ -39,6 +53,14 @@ typedef struct ...@@ -39,6 +53,14 @@ typedef struct
TimestampTz sendTime; TimestampTz sendTime;
} WalDataMessageHeader; } WalDataMessageHeader;
/*
* Keepalive message from primary (message type 'k'). (lowercase k)
* This is wrapped within a CopyData message at the FE/BE protocol level.
*
* Note that the data length is not specified here.
*/
typedef WalSndrMessage PrimaryKeepaliveMessage;
/* /*
* Reply message from standby (message type 'r'). This is wrapped within * Reply message from standby (message type 'r'). This is wrapped within
* a CopyData message at the FE/BE protocol level. * a CopyData message at the FE/BE protocol level.
......
...@@ -78,6 +78,12 @@ typedef struct ...@@ -78,6 +78,12 @@ typedef struct
*/ */
XLogRecPtr latestChunkStart; XLogRecPtr latestChunkStart;
/*
* Time of send and receive of any message received.
*/
TimestampTz lastMsgSendTime;
TimestampTz lastMsgReceiptTime;
/* /*
* connection string; is used for walreceiver to connect with the primary. * connection string; is used for walreceiver to connect with the primary.
*/ */
...@@ -112,5 +118,7 @@ extern void ShutdownWalRcv(void); ...@@ -112,5 +118,7 @@ extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void); extern bool WalRcvInProgress(void);
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo); extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart); extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
#endif /* _WALRECEIVER_H */ #endif /* _WALRECEIVER_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment