Commit 9290ad19 authored by Amit Kapila's avatar Amit Kapila

Track statistics for spilling of changes from ReorderBuffer.

This adds the statistics about transactions spilled to disk from
ReorderBuffer.  Users can query the pg_stat_replication view to check
these stats.

Author: Tomas Vondra, with bug-fixes and minor changes by Dilip Kumar
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
parent 168d2064
...@@ -1972,6 +1972,26 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i ...@@ -1972,6 +1972,26 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<entry><type>timestamp with time zone</type></entry> <entry><type>timestamp with time zone</type></entry>
<entry>Send time of last reply message received from standby server</entry> <entry>Send time of last reply message received from standby server</entry>
</row> </row>
<row>
<entry><structfield>spill_bytes</structfield></entry>
<entry><type>bigint</type></entry>
<entry>Amount of decoded transaction data spilled to disk.</entry>
</row>
<row>
<entry><structfield>spill_txns</structfield></entry>
<entry><type>bigint</type></entry>
<entry>Number of transactions spilled to disk after the memory used by
logical decoding exceeds <literal>logical_decoding_work_mem</literal>. The
counter gets incremented both for toplevel transactions and
subtransactions.</entry>
</row>
<row>
<entry><structfield>spill_count</structfield></entry>
<entry><type>bigint</type></entry>
<entry>Number of times transactions were spilled to disk. Transactions
may get spilled repeatedly, and this counter gets incremented on every
such invocation.</entry>
</row>
</tbody> </tbody>
</tgroup> </tgroup>
</table> </table>
......
...@@ -776,7 +776,10 @@ CREATE VIEW pg_stat_replication AS ...@@ -776,7 +776,10 @@ CREATE VIEW pg_stat_replication AS
W.replay_lag, W.replay_lag,
W.sync_priority, W.sync_priority,
W.sync_state, W.sync_state,
W.reply_time W.reply_time,
W.spill_txns,
W.spill_count,
W.spill_bytes
FROM pg_stat_get_activity(NULL) AS S FROM pg_stat_get_activity(NULL) AS S
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
......
...@@ -308,6 +308,10 @@ ReorderBufferAllocate(void) ...@@ -308,6 +308,10 @@ ReorderBufferAllocate(void)
buffer->outbufsize = 0; buffer->outbufsize = 0;
buffer->size = 0; buffer->size = 0;
buffer->spillCount = 0;
buffer->spillTxns = 0;
buffer->spillBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
dlist_init(&buffer->toplevel_by_lsn); dlist_init(&buffer->toplevel_by_lsn);
...@@ -2415,6 +2419,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -2415,6 +2419,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
int fd = -1; int fd = -1;
XLogSegNo curOpenSegNo = 0; XLogSegNo curOpenSegNo = 0;
Size spilled = 0; Size spilled = 0;
Size size = txn->size;
elog(DEBUG2, "spill %u changes in XID %u to disk", elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid); (uint32) txn->nentries_mem, txn->xid);
...@@ -2473,6 +2478,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -2473,6 +2478,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
spilled++; spilled++;
} }
/* update the statistics */
rb->spillCount += 1;
rb->spillBytes += size;
/* Don't consider already serialized transaction. */
rb->spillTxns += txn->serialized ? 0 : 1;
Assert(spilled == txn->nentries_mem); Assert(spilled == txn->nentries_mem);
Assert(dlist_is_empty(&txn->changes)); Assert(dlist_is_empty(&txn->changes));
txn->nentries_mem = 0; txn->nentries_mem = 0;
......
...@@ -248,6 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); ...@@ -248,6 +248,7 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
static void UpdateSpillStats(LogicalDecodingContext *ctx);
static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count); static void XLogRead(WALSegmentContext *segcxt, char *buf, XLogRecPtr startptr, Size count);
...@@ -1261,7 +1262,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, ...@@ -1261,7 +1262,8 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/* /*
* LogicalDecodingContext 'update_progress' callback. * LogicalDecodingContext 'update_progress' callback.
* *
* Write the current position to the lag tracker (see XLogSendPhysical). * Write the current position to the lag tracker (see XLogSendPhysical),
* and update the spill statistics.
*/ */
static void static void
WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
...@@ -1280,6 +1282,11 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId ...@@ -1280,6 +1282,11 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
LagTrackerWrite(lsn, now); LagTrackerWrite(lsn, now);
sendTime = now; sendTime = now;
/*
* Update statistics about transactions that spilled to disk.
*/
UpdateSpillStats(ctx);
} }
/* /*
...@@ -2318,6 +2325,9 @@ InitWalSenderSlot(void) ...@@ -2318,6 +2325,9 @@ InitWalSenderSlot(void)
walsnd->state = WALSNDSTATE_STARTUP; walsnd->state = WALSNDSTATE_STARTUP;
walsnd->latch = &MyProc->procLatch; walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0; walsnd->replyTime = 0;
walsnd->spillTxns = 0;
walsnd->spillCount = 0;
walsnd->spillBytes = 0;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */ /* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd; MyWalSnd = (WalSnd *) walsnd;
...@@ -3219,7 +3229,7 @@ offset_to_interval(TimeOffset offset) ...@@ -3219,7 +3229,7 @@ offset_to_interval(TimeOffset offset)
Datum Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS) pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{ {
#define PG_STAT_GET_WAL_SENDERS_COLS 12 #define PG_STAT_GET_WAL_SENDERS_COLS 15
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc; TupleDesc tupdesc;
Tuplestorestate *tupstore; Tuplestorestate *tupstore;
...@@ -3274,6 +3284,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3274,6 +3284,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int pid; int pid;
WalSndState state; WalSndState state;
TimestampTz replyTime; TimestampTz replyTime;
int64 spillTxns;
int64 spillCount;
int64 spillBytes;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
...@@ -3294,6 +3307,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3294,6 +3307,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
applyLag = walsnd->applyLag; applyLag = walsnd->applyLag;
priority = walsnd->sync_standby_priority; priority = walsnd->sync_standby_priority;
replyTime = walsnd->replyTime; replyTime = walsnd->replyTime;
spillTxns = walsnd->spillTxns;
spillCount = walsnd->spillCount;
spillBytes = walsnd->spillBytes;
SpinLockRelease(&walsnd->mutex); SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls)); memset(nulls, 0, sizeof(nulls));
...@@ -3375,6 +3391,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) ...@@ -3375,6 +3391,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
nulls[11] = true; nulls[11] = true;
else else
values[11] = TimestampTzGetDatum(replyTime); values[11] = TimestampTzGetDatum(replyTime);
/* spill to disk */
values[12] = Int64GetDatum(spillTxns);
values[13] = Int64GetDatum(spillCount);
values[14] = Int64GetDatum(spillBytes);
} }
tuplestore_putvalues(tupstore, tupdesc, values, nulls); tuplestore_putvalues(tupstore, tupdesc, values, nulls);
...@@ -3611,3 +3632,20 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) ...@@ -3611,3 +3632,20 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Assert(time != 0); Assert(time != 0);
return now - time; return now - time;
} }
static void
UpdateSpillStats(LogicalDecodingContext *ctx)
{
ReorderBuffer *rb = ctx->reorder;
SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->spillTxns = rb->spillTxns;
MyWalSnd->spillCount = rb->spillCount;
MyWalSnd->spillBytes = rb->spillBytes;
elog(DEBUG2, "UpdateSpillStats: updating stats %p %ld %ld %ld",
rb, rb->spillTxns, rb->spillCount, rb->spillBytes);
SpinLockRelease(&MyWalSnd->mutex);
}
...@@ -53,6 +53,6 @@ ...@@ -53,6 +53,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 201911121 #define CATALOG_VERSION_NO 201911211
#endif #endif
...@@ -5166,9 +5166,9 @@ ...@@ -5166,9 +5166,9 @@
proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r', proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '', prorettype => 'record', proargtypes => '',
proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}', proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,int8,int8}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}', proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,spill_txns,spill_count,spill_bytes}',
prosrc => 'pg_stat_get_wal_senders' }, prosrc => 'pg_stat_get_wal_senders' },
{ oid => '3317', descr => 'statistics: information about WAL receiver', { oid => '3317', descr => 'statistics: information about WAL receiver',
proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's',
......
...@@ -402,6 +402,17 @@ struct ReorderBuffer ...@@ -402,6 +402,17 @@ struct ReorderBuffer
/* memory accounting */ /* memory accounting */
Size size; Size size;
/*
* Statistics about transactions spilled to disk.
*
* A single transaction may be spilled repeatedly, which is why we keep
* two different counters. For spilling, the transaction counter includes
* both toplevel transactions and subtransactions.
*/
int64 spillCount; /* spill-to-disk invocation counter */
int64 spillTxns; /* number of transactions spilled to disk */
int64 spillBytes; /* amount of data spilled to disk */
}; };
......
...@@ -80,6 +80,11 @@ typedef struct WalSnd ...@@ -80,6 +80,11 @@ typedef struct WalSnd
* Timestamp of the last message received from standby. * Timestamp of the last message received from standby.
*/ */
TimestampTz replyTime; TimestampTz replyTime;
/* Statistics for transactions spilled to disk. */
int64 spillTxns;
int64 spillCount;
int64 spillBytes;
} WalSnd; } WalSnd;
extern WalSnd *MyWalSnd; extern WalSnd *MyWalSnd;
......
...@@ -1952,9 +1952,12 @@ pg_stat_replication| SELECT s.pid, ...@@ -1952,9 +1952,12 @@ pg_stat_replication| SELECT s.pid,
w.replay_lag, w.replay_lag,
w.sync_priority, w.sync_priority,
w.sync_state, w.sync_state,
w.reply_time w.reply_time,
w.spill_txns,
w.spill_count,
w.spill_bytes
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc) FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc)
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, spill_txns, spill_count, spill_bytes) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_ssl| SELECT s.pid, pg_stat_ssl| SELECT s.pid,
s.ssl, s.ssl,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment