Commit 7fee252f authored by Michael Paquier's avatar Michael Paquier

Add timestamp of last received message from standby to pg_stat_replication

The timestamp generated by the standby at message transmission has been
included in the protocol since its introduction for both the status
update message and hot standby feedback message, but it has never
appeared in pg_stat_replication.  Seeing this timestamp does not matter
much with a cluster which has a lot of activity, but on a mostly-idle
cluster, this makes monitoring able to react faster than the configured
timeouts.

Author: MyungKyu LIM
Reviewed-by: Michael Paquier, Masahiko Sawada
Discussion: https://postgr.es/m/1657809367.407321.1533027417725.JavaMail.jboss@ep2ml404
parent 1f66c657
...@@ -1920,6 +1920,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i ...@@ -1920,6 +1920,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
</itemizedlist> </itemizedlist>
</entry> </entry>
</row> </row>
<row>
<entry><structfield>reply_time</structfield></entry>
<entry><type>timestamp with time zone</type></entry>
<entry>Send time of last reply message received from standby server</entry>
</row>
</tbody> </tbody>
</tgroup> </tgroup>
</table> </table>
......
...@@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS ...@@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS
W.flush_lag, W.flush_lag,
W.replay_lag, W.replay_lag,
W.sync_priority, W.sync_priority,
W.sync_state W.sync_state,
W.reply_time
FROM pg_stat_get_activity(NULL) AS S FROM pg_stat_get_activity(NULL) AS S
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
......
...@@ -1763,6 +1763,7 @@ ProcessStandbyReplyMessage(void) ...@@ -1763,6 +1763,7 @@ ProcessStandbyReplyMessage(void)
applyLag; applyLag;
bool clearLagTimes; bool clearLagTimes;
TimestampTz now; TimestampTz now;
TimestampTz replyTime;
static bool fullyAppliedLastTime = false; static bool fullyAppliedLastTime = false;
...@@ -1770,14 +1771,25 @@ ProcessStandbyReplyMessage(void) ...@@ -1770,14 +1771,25 @@ ProcessStandbyReplyMessage(void)
writePtr = pq_getmsgint64(&reply_message); writePtr = pq_getmsgint64(&reply_message);
flushPtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message);
applyPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message);
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ replyTime = pq_getmsgint64(&reply_message);
replyRequested = pq_getmsgbyte(&reply_message); replyRequested = pq_getmsgbyte(&reply_message);
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", if (log_min_messages <= DEBUG2)
(uint32) (writePtr >> 32), (uint32) writePtr, {
(uint32) (flushPtr >> 32), (uint32) flushPtr, char *replyTimeStr;
(uint32) (applyPtr >> 32), (uint32) applyPtr,
replyRequested ? " (reply requested)" : ""); /* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
(uint32) (writePtr >> 32), (uint32) writePtr,
(uint32) (flushPtr >> 32), (uint32) flushPtr,
(uint32) (applyPtr >> 32), (uint32) applyPtr,
replyRequested ? " (reply requested)" : "",
replyTimeStr);
pfree(replyTimeStr);
}
/* See if we can compute the round-trip lag for these positions. */ /* See if we can compute the round-trip lag for these positions. */
now = GetCurrentTimestamp(); now = GetCurrentTimestamp();
...@@ -1824,6 +1836,7 @@ ProcessStandbyReplyMessage(void) ...@@ -1824,6 +1836,7 @@ ProcessStandbyReplyMessage(void)
walsnd->flushLag = flushLag; walsnd->flushLag = flushLag;
if (applyLag != -1 || clearLagTimes) if (applyLag != -1 || clearLagTimes)
walsnd->applyLag = applyLag; walsnd->applyLag = applyLag;
walsnd->replyTime = replyTime;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
} }
...@@ -1927,23 +1940,47 @@ ProcessStandbyHSFeedbackMessage(void) ...@@ -1927,23 +1940,47 @@ ProcessStandbyHSFeedbackMessage(void)
uint32 feedbackEpoch; uint32 feedbackEpoch;
TransactionId feedbackCatalogXmin; TransactionId feedbackCatalogXmin;
uint32 feedbackCatalogEpoch; uint32 feedbackCatalogEpoch;
TimestampTz replyTime;
/* /*
* Decipher the reply message. The caller already consumed the msgtype * Decipher the reply message. The caller already consumed the msgtype
* byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
* of this message. * of this message.
*/ */
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ replyTime = pq_getmsgint64(&reply_message);
feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackXmin = pq_getmsgint(&reply_message, 4);
feedbackEpoch = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4);
feedbackCatalogXmin = pq_getmsgint(&reply_message, 4); feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4); feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u", if (log_min_messages <= DEBUG2)
feedbackXmin, {
feedbackEpoch, char *replyTimeStr;
feedbackCatalogXmin,
feedbackCatalogEpoch); /* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
feedbackXmin,
feedbackEpoch,
feedbackCatalogXmin,
feedbackCatalogEpoch,
replyTimeStr);
pfree(replyTimeStr);
}
/*
* Update shared state for this WalSender process based on reply data from
* standby.
*/
{
WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->replyTime = replyTime;
SpinLockRelease(&walsnd->mutex);
}
/* /*
* Unset WalSender's xmins if the feedback message values are invalid. * Unset WalSender's xmins if the feedback message values are invalid.
...@@ -2265,6 +2302,7 @@ InitWalSenderSlot(void) ...@@ -2265,6 +2302,7 @@ InitWalSenderSlot(void)
walsnd->applyLag = -1; walsnd->applyLag = -1;
walsnd->state = WALSNDSTATE_STARTUP; walsnd->state = WALSNDSTATE_STARTUP;
walsnd->latch = &MyProc->procLatch; walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */ /* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd; MyWalSnd = (WalSnd *) walsnd;
...@@ -3179,7 +3217,7 @@ offset_to_interval(TimeOffset offset) ...@@ -3179,7 +3217,7 @@ offset_to_interval(TimeOffset offset)
Datum Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS) pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{ {
#define PG_STAT_GET_WAL_SENDERS_COLS 11 #define PG_STAT_GET_WAL_SENDERS_COLS 12
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc; TupleDesc tupdesc;
Tuplestorestate *tupstore; Tuplestorestate *tupstore;
...@@ -3233,6 +3271,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3233,6 +3271,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int priority; int priority;
int pid; int pid;
WalSndState state; WalSndState state;
TimestampTz replyTime;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
...@@ -3252,6 +3291,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3252,6 +3291,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
flushLag = walsnd->flushLag; flushLag = walsnd->flushLag;
applyLag = walsnd->applyLag; applyLag = walsnd->applyLag;
priority = walsnd->sync_standby_priority; priority = walsnd->sync_standby_priority;
replyTime = walsnd->replyTime;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls)); memset(nulls, 0, sizeof(nulls));
...@@ -3328,6 +3368,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3328,6 +3368,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else else
values[10] = CStringGetTextDatum("potential"); values[10] = CStringGetTextDatum("potential");
if (replyTime == 0)
nulls[11] = true;
else
values[11] = TimestampTzGetDatum(replyTime);
} }
tuplestore_putvalues(tupstore, tupdesc, values, nulls); tuplestore_putvalues(tupstore, tupdesc, values, nulls);
......
...@@ -53,6 +53,6 @@ ...@@ -53,6 +53,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 201811201 #define CATALOG_VERSION_NO 201812091
#endif #endif
...@@ -5023,9 +5023,9 @@ ...@@ -5023,9 +5023,9 @@
proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r', proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '', prorettype => 'record', proargtypes => '',
proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text}', proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}', proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state}', proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}',
prosrc => 'pg_stat_get_wal_senders' }, prosrc => 'pg_stat_get_wal_senders' },
{ oid => '3317', descr => 'statistics: information about WAL receiver', { oid => '3317', descr => 'statistics: information about WAL receiver',
proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
......
...@@ -75,6 +75,11 @@ typedef struct WalSnd ...@@ -75,6 +75,11 @@ typedef struct WalSnd
* SyncRepLock. * SyncRepLock.
*/ */
int sync_standby_priority; int sync_standby_priority;
/*
* Timestamp of the last message received from standby.
*/
TimestampTz replyTime;
} WalSnd; } WalSnd;
extern WalSnd *MyWalSnd; extern WalSnd *MyWalSnd;
......
...@@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid, ...@@ -1861,9 +1861,10 @@ pg_stat_replication| SELECT s.pid,
w.flush_lag, w.flush_lag,
w.replay_lag, w.replay_lag,
w.sync_priority, w.sync_priority,
w.sync_state w.sync_state,
w.reply_time
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn) FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn)
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid))) JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_ssl| SELECT s.pid, pg_stat_ssl| SELECT s.pid,
s.ssl, s.ssl,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment