Commit 592f00f8 authored by Amit Kapila's avatar Amit Kapila

Update replication statistics after every stream/spill.

Currently, replication slot statistics are updated at prepare, commit, and
rollback. Now, if the transaction is interrupted the stats might not get
updated. Fixed this by updating replication statistics after every
stream/spill.

In passing update the docs to change the description of some of the slot
stats.

Author: Vignesh C, Sawada Masahiko
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
parent 7f2e10ba
...@@ -2708,10 +2708,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i ...@@ -2708,10 +2708,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<structfield>stream_bytes</structfield><type>bigint</type> <structfield>stream_bytes</structfield><type>bigint</type>
</para> </para>
<para> <para>
Amount of decoded in-progress transaction data streamed to the decoding Amount of transaction data decoded for streaming in-progress
output plugin while decoding changes from WAL for this slot. This and other transactions to the decoding output plugin while decoding changes from
streaming counters for this slot can be used to gauge the network I/O which WAL for this slot. This and other streaming counters for this slot can
occurred during logical decoding and allow tuning <literal>logical_decoding_work_mem</literal>. be used to tune <literal>logical_decoding_work_mem</literal>.
</para> </para>
</entry> </entry>
</row> </row>
...@@ -2733,10 +2733,9 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i ...@@ -2733,10 +2733,9 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
<structfield>total_bytes</structfield><type>bigint</type> <structfield>total_bytes</structfield><type>bigint</type>
</para> </para>
<para> <para>
Amount of decoded transaction data sent to the decoding output plugin Amount of transaction data decoded for sending transactions to the
while decoding the changes from WAL for this slot. This can be used to decoding output plugin while decoding changes from WAL for this slot.
gauge the total amount of data sent during logical decoding. Note that Note that this includes data that is streamed and/or spilled.
this includes data that is streamed and/or spilled.
</para> </para>
</entry> </entry>
</row> </row>
......
...@@ -746,9 +746,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ...@@ -746,9 +746,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
} }
/* /*
* Update the decoding stats at transaction prepare/commit/abort. It is * Update the decoding stats at transaction prepare/commit/abort.
* not clear that sending more or less frequently than this would be * Additionally we send the stats when we spill or stream the changes to
* better. * avoid losing them in case the decoding is interrupted. It is not clear
* that sending more or less frequently than this would be better.
*/ */
UpdateDecodingStats(ctx); UpdateDecodingStats(ctx);
} }
...@@ -828,9 +829,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, ...@@ -828,9 +829,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid); ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
/* /*
* Update the decoding stats at transaction prepare/commit/abort. It is * Update the decoding stats at transaction prepare/commit/abort.
* not clear that sending more or less frequently than this would be * Additionally we send the stats when we spill or stream the changes to
* better. * avoid losing them in case the decoding is interrupted. It is not clear
* that sending more or less frequently than this would be better.
*/ */
UpdateDecodingStats(ctx); UpdateDecodingStats(ctx);
} }
......
...@@ -3559,6 +3559,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -3559,6 +3559,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* don't consider already serialized transactions */ /* don't consider already serialized transactions */
rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
/* update the decoding stats */
UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
} }
Assert(spilled == txn->nentries_mem); Assert(spilled == txn->nentries_mem);
...@@ -3928,6 +3931,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ...@@ -3928,6 +3931,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Don't consider already streamed transaction. */ /* Don't consider already streamed transaction. */
rb->streamTxns += (txn_is_streamed) ? 0 : 1; rb->streamTxns += (txn_is_streamed) ? 0 : 1;
/* update the decoding stats */
UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
Assert(dlist_is_empty(&txn->changes)); Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0); Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0); Assert(txn->nentries_mem == 0);
......
...@@ -617,14 +617,14 @@ struct ReorderBuffer ...@@ -617,14 +617,14 @@ struct ReorderBuffer
/* Statistics about transactions streamed to the decoding output plugin */ /* Statistics about transactions streamed to the decoding output plugin */
int64 streamTxns; /* number of transactions streamed */ int64 streamTxns; /* number of transactions streamed */
int64 streamCount; /* streaming invocation counter */ int64 streamCount; /* streaming invocation counter */
int64 streamBytes; /* amount of data streamed */ int64 streamBytes; /* amount of data decoded */
/* /*
* Statistics about all the transactions sent to the decoding output * Statistics about all the transactions sent to the decoding output
* plugin * plugin
*/ */
int64 totalTxns; /* total number of transactions sent */ int64 totalTxns; /* total number of transactions sent */
int64 totalBytes; /* total amount of data sent */ int64 totalBytes; /* total amount of data decoded */
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment