Commit f5fc2f5b authored by Amit Kapila's avatar Amit Kapila

Add information of total data processed to replication slot stats.

This adds the statistics about total transactions count and total
transaction data logically sent to the decoding output plugin from
ReorderBuffer. Users can query the pg_stat_replication_slots view to check
these stats.

Suggested-by: Andres Freund
Author: Vignesh C and Amit Kapila
Reviewed-by: Sawada Masahiko, Amit Kapila
Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
parent 1bf946bd
...@@ -17,6 +17,8 @@ ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ...@@ -17,6 +17,8 @@ ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
# typical installcheck users do not have (e.g. buildfarm clients). # typical installcheck users do not have (e.g. buildfarm clients).
NO_INSTALLCHECK = 1 NO_INSTALLCHECK = 1
TAP_TESTS = 1
ifdef USE_PGXS ifdef USE_PGXS
PG_CONFIG = pg_config PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs) PGXS := $(shell $(PG_CONFIG) --pgxs)
......
...@@ -8,7 +8,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d ...@@ -8,7 +8,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
CREATE TABLE stats_test(data text); CREATE TABLE stats_test(data text);
-- function to wait for counters to advance -- function to wait for counters to advance
CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$ CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
DECLARE DECLARE
start_time timestamptz := clock_timestamp(); start_time timestamptz := clock_timestamp();
updated bool; updated bool;
...@@ -16,6 +16,8 @@ BEGIN ...@@ -16,6 +16,8 @@ BEGIN
-- we don't want to wait forever; loop will exit after 30 seconds -- we don't want to wait forever; loop will exit after 30 seconds
FOR i IN 1 .. 300 LOOP FOR i IN 1 .. 300 LOOP
IF check_spill_txns THEN
-- check to see if all updates have been reset/updated -- check to see if all updates have been reset/updated
SELECT CASE WHEN check_reset THEN (spill_txns = 0) SELECT CASE WHEN check_reset THEN (spill_txns = 0)
ELSE (spill_txns > 0) ELSE (spill_txns > 0)
...@@ -23,6 +25,17 @@ BEGIN ...@@ -23,6 +25,17 @@ BEGIN
INTO updated INTO updated
FROM pg_stat_replication_slots WHERE slot_name='regression_slot'; FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
ELSE
-- check to see if all updates have been reset/updated
SELECT CASE WHEN check_reset THEN (total_txns = 0)
ELSE (total_txns > 0)
END
INTO updated
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
END IF;
exit WHEN updated; exit WHEN updated;
-- wait a little -- wait a little
...@@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, ...@@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
-- Check stats, wait for the stats collector to update. We can't test the -- Check stats, wait for the stats collector to update. We can't test the
-- exact stats count as that can vary if any background transaction (say by -- exact stats count as that can vary if any background transaction (say by
-- autovacuum) happens in parallel to the main transaction. -- autovacuum) happens in parallel to the main transaction.
SELECT wait_for_decode_stats(false); SELECT wait_for_decode_stats(false, true);
wait_for_decode_stats wait_for_decode_stats
----------------------- -----------------------
(1 row) (1 row)
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots; SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
slot_name | spill_txns | spill_count slot_name | spill_txns | spill_count | total_txns | total_bytes
-----------------+------------+------------- -----------------+------------+-------------+------------+-------------
regression_slot | t | t regression_slot | t | t | t | t
(1 row) (1 row)
-- reset the slot stats, and wait for stats collector to reset -- reset the slot stats, and wait for stats collector to reset
...@@ -70,16 +83,16 @@ SELECT pg_stat_reset_replication_slot('regression_slot'); ...@@ -70,16 +83,16 @@ SELECT pg_stat_reset_replication_slot('regression_slot');
(1 row) (1 row)
SELECT wait_for_decode_stats(true); SELECT wait_for_decode_stats(true, true);
wait_for_decode_stats wait_for_decode_stats
----------------------- -----------------------
(1 row) (1 row)
SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots; SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
slot_name | spill_txns | spill_count slot_name | spill_txns | spill_count | total_txns | total_bytes
-----------------+------------+------------- -----------------+------------+-------------+------------+-------------
regression_slot | 0 | 0 regression_slot | 0 | 0 | 0 | 0
(1 row) (1 row)
-- decode and check stats again. -- decode and check stats again.
...@@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, ...@@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
5002 5002
(1 row) (1 row)
SELECT wait_for_decode_stats(false); SELECT wait_for_decode_stats(false, true);
wait_for_decode_stats
-----------------------
(1 row)
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
slot_name | spill_txns | spill_count | total_txns | total_bytes
-----------------+------------+-------------+------------+-------------
regression_slot | t | t | t | t
(1 row)
SELECT pg_stat_reset_replication_slot('regression_slot');
pg_stat_reset_replication_slot
--------------------------------
(1 row)
-- non-spilled xact
INSERT INTO stats_test values(generate_series(1, 10));
SELECT wait_for_decode_stats(false, false);
wait_for_decode_stats wait_for_decode_stats
----------------------- -----------------------
(1 row) (1 row)
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots; SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
slot_name | spill_txns | spill_count slot_name | spill_txns | spill_count | total_txns | total_bytes
-----------------+------------+------------- -----------------+------------+-------------+------------+-------------
regression_slot | t | t regression_slot | f | f | t | t
(1 row) (1 row)
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
...@@ -117,7 +150,7 @@ SELECT slot_name FROM pg_stat_replication_slots; ...@@ -117,7 +150,7 @@ SELECT slot_name FROM pg_stat_replication_slots;
(1 row) (1 row)
COMMIT; COMMIT;
DROP FUNCTION wait_for_decode_stats(bool); DROP FUNCTION wait_for_decode_stats(bool, bool);
DROP TABLE stats_test; DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot pg_drop_replication_slot
......
...@@ -6,7 +6,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d ...@@ -6,7 +6,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
CREATE TABLE stats_test(data text); CREATE TABLE stats_test(data text);
-- function to wait for counters to advance -- function to wait for counters to advance
CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$ CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$
DECLARE DECLARE
start_time timestamptz := clock_timestamp(); start_time timestamptz := clock_timestamp();
updated bool; updated bool;
...@@ -14,6 +14,8 @@ BEGIN ...@@ -14,6 +14,8 @@ BEGIN
-- we don't want to wait forever; loop will exit after 30 seconds -- we don't want to wait forever; loop will exit after 30 seconds
FOR i IN 1 .. 300 LOOP FOR i IN 1 .. 300 LOOP
IF check_spill_txns THEN
-- check to see if all updates have been reset/updated -- check to see if all updates have been reset/updated
SELECT CASE WHEN check_reset THEN (spill_txns = 0) SELECT CASE WHEN check_reset THEN (spill_txns = 0)
ELSE (spill_txns > 0) ELSE (spill_txns > 0)
...@@ -21,6 +23,17 @@ BEGIN ...@@ -21,6 +23,17 @@ BEGIN
INTO updated INTO updated
FROM pg_stat_replication_slots WHERE slot_name='regression_slot'; FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
ELSE
-- check to see if all updates have been reset/updated
SELECT CASE WHEN check_reset THEN (total_txns = 0)
ELSE (total_txns > 0)
END
INTO updated
FROM pg_stat_replication_slots WHERE slot_name='regression_slot';
END IF;
exit WHEN updated; exit WHEN updated;
-- wait a little -- wait a little
...@@ -46,18 +59,25 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, ...@@ -46,18 +59,25 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL,
-- Check stats, wait for the stats collector to update. We can't test the -- Check stats, wait for the stats collector to update. We can't test the
-- exact stats count as that can vary if any background transaction (say by -- exact stats count as that can vary if any background transaction (say by
-- autovacuum) happens in parallel to the main transaction. -- autovacuum) happens in parallel to the main transaction.
SELECT wait_for_decode_stats(false); SELECT wait_for_decode_stats(false, true);
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots; SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
-- reset the slot stats, and wait for stats collector to reset -- reset the slot stats, and wait for stats collector to reset
SELECT pg_stat_reset_replication_slot('regression_slot'); SELECT pg_stat_reset_replication_slot('regression_slot');
SELECT wait_for_decode_stats(true); SELECT wait_for_decode_stats(true, true);
SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots; SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots;
-- decode and check stats again. -- decode and check stats again.
SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1'); SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1');
SELECT wait_for_decode_stats(false); SELECT wait_for_decode_stats(false, true);
SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots; SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
SELECT pg_stat_reset_replication_slot('regression_slot');
-- non-spilled xact
INSERT INTO stats_test values(generate_series(1, 10));
SELECT wait_for_decode_stats(false, false);
SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots;
-- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
-- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
...@@ -66,6 +86,6 @@ SELECT slot_name FROM pg_stat_replication_slots; ...@@ -66,6 +86,6 @@ SELECT slot_name FROM pg_stat_replication_slots;
SELECT slot_name FROM pg_stat_replication_slots; SELECT slot_name FROM pg_stat_replication_slots;
COMMIT; COMMIT;
DROP FUNCTION wait_for_decode_stats(bool); DROP FUNCTION wait_for_decode_stats(bool, bool);
DROP TABLE stats_test; DROP TABLE stats_test;
SELECT pg_drop_replication_slot('regression_slot'); SELECT pg_drop_replication_slot('regression_slot');
# Test replication statistics data in pg_stat_replication_slots is sane after
# drop replication slot and restart.
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 1;
# Test set-up
my $node = get_new_node('test');
$node->init(allows_streaming => 'logical');
$node->append_conf('postgresql.conf', 'synchronous_commit = on');
$node->start;
# Create table.
$node->safe_psql('postgres',
"CREATE TABLE test_repl_stat(col1 int)");
# Create replication slots.
$node->safe_psql(
'postgres', qq[
SELECT pg_create_logical_replication_slot('regression_slot1', 'test_decoding');
SELECT pg_create_logical_replication_slot('regression_slot2', 'test_decoding');
SELECT pg_create_logical_replication_slot('regression_slot3', 'test_decoding');
SELECT pg_create_logical_replication_slot('regression_slot4', 'test_decoding');
]);
# Insert some data.
$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
$node->safe_psql(
'postgres', qq[
SELECT data FROM pg_logical_slot_get_changes('regression_slot1', NULL,
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT data FROM pg_logical_slot_get_changes('regression_slot2', NULL,
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT data FROM pg_logical_slot_get_changes('regression_slot3', NULL,
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT data FROM pg_logical_slot_get_changes('regression_slot4', NULL,
NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
]);
# Wait for the statistics to be updated.
$node->poll_query_until(
'postgres', qq[
SELECT count(slot_name) >= 4 FROM pg_stat_replication_slots
WHERE slot_name ~ 'regression_slot'
AND total_txns > 0 AND total_bytes > 0;
]) or die "Timed out while waiting for statistics to be updated";
# Test to drop one of the replication slot and verify replication statistics data is
# fine after restart.
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
$node->stop;
$node->start;
# Verify statistics data present in pg_stat_replication_slots are sane after
# restart.
my $result = $node->safe_psql('postgres',
"SELECT slot_name, total_txns > 0 AS total_txn,
total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
ORDER BY slot_name"
);
is($result, qq(regression_slot1|t|t
regression_slot2|t|t
regression_slot3|t|t), 'check replication statistics are updated');
# cleanup
$node->safe_psql('postgres', "DROP TABLE test_repl_stat");
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
# shutdown
$node->stop;
...@@ -2716,6 +2716,31 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i ...@@ -2716,6 +2716,31 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
</entry> </entry>
</row> </row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>total_txns</structfield> <type>bigint</type>
</para>
<para>
Number of decoded transactions sent to the decoding output plugin for
this slot. This counter is used to maintain the top level transactions,
so the counter is not incremented for subtransactions. Note that this
includes the transactions that are streamed and/or spilled.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>total_bytes</structfield><type>bigint</type>
</para>
<para>
Amount of decoded transactions data sent to the decoding output plugin
while decoding the changes from WAL for this slot. This can be used to
gauge the total amount of data sent during logical decoding. Note that
this includes the data that is streamed and/or spilled.
</para>
</entry>
</row>
<row> <row>
<entry role="catalog_table_entry"><para role="column_definition"> <entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type> <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
......
...@@ -875,6 +875,8 @@ CREATE VIEW pg_stat_replication_slots AS ...@@ -875,6 +875,8 @@ CREATE VIEW pg_stat_replication_slots AS
s.stream_txns, s.stream_txns,
s.stream_count, s.stream_count,
s.stream_bytes, s.stream_bytes,
s.total_txns,
s.total_bytes,
s.stats_reset s.stats_reset
FROM pg_stat_get_replication_slots() AS s; FROM pg_stat_get_replication_slots() AS s;
......
...@@ -1829,6 +1829,8 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) ...@@ -1829,6 +1829,8 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
msg.m_stream_txns = repSlotStat->stream_txns; msg.m_stream_txns = repSlotStat->stream_txns;
msg.m_stream_count = repSlotStat->stream_count; msg.m_stream_count = repSlotStat->stream_count;
msg.m_stream_bytes = repSlotStat->stream_bytes; msg.m_stream_bytes = repSlotStat->stream_bytes;
msg.m_total_txns = repSlotStat->total_txns;
msg.m_total_bytes = repSlotStat->total_bytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
} }
...@@ -5568,6 +5570,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) ...@@ -5568,6 +5570,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
replSlotStats[idx].stream_txns += msg->m_stream_txns; replSlotStats[idx].stream_txns += msg->m_stream_txns;
replSlotStats[idx].stream_count += msg->m_stream_count; replSlotStats[idx].stream_count += msg->m_stream_count;
replSlotStats[idx].stream_bytes += msg->m_stream_bytes; replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
replSlotStats[idx].total_txns += msg->m_total_txns;
replSlotStats[idx].total_bytes += msg->m_total_bytes;
} }
} }
...@@ -5795,6 +5799,8 @@ pgstat_reset_replslot(int i, TimestampTz ts) ...@@ -5795,6 +5799,8 @@ pgstat_reset_replslot(int i, TimestampTz ts)
replSlotStats[i].stream_txns = 0; replSlotStats[i].stream_txns = 0;
replSlotStats[i].stream_count = 0; replSlotStats[i].stream_count = 0;
replSlotStats[i].stream_bytes = 0; replSlotStats[i].stream_bytes = 0;
replSlotStats[i].total_txns = 0;
replSlotStats[i].total_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts; replSlotStats[i].stat_reset_timestamp = ts;
} }
......
...@@ -1775,21 +1775,20 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) ...@@ -1775,21 +1775,20 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
ReorderBuffer *rb = ctx->reorder; ReorderBuffer *rb = ctx->reorder;
PgStat_ReplSlotStats repSlotStat; PgStat_ReplSlotStats repSlotStat;
/* /* Nothing to do if we don't have any replication stats to be sent. */
* Nothing to do if we haven't spilled or streamed anything since the last if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
* time the stats has been sent.
*/
if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
return; return;
elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld", elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
rb, rb,
(long long) rb->spillTxns, (long long) rb->spillTxns,
(long long) rb->spillCount, (long long) rb->spillCount,
(long long) rb->spillBytes, (long long) rb->spillBytes,
(long long) rb->streamTxns, (long long) rb->streamTxns,
(long long) rb->streamCount, (long long) rb->streamCount,
(long long) rb->streamBytes); (long long) rb->streamBytes,
(long long) rb->totalTxns,
(long long) rb->totalBytes);
namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name)); namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
repSlotStat.spill_txns = rb->spillTxns; repSlotStat.spill_txns = rb->spillTxns;
...@@ -1798,12 +1797,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) ...@@ -1798,12 +1797,17 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
repSlotStat.stream_txns = rb->streamTxns; repSlotStat.stream_txns = rb->streamTxns;
repSlotStat.stream_count = rb->streamCount; repSlotStat.stream_count = rb->streamCount;
repSlotStat.stream_bytes = rb->streamBytes; repSlotStat.stream_bytes = rb->streamBytes;
repSlotStat.total_txns = rb->totalTxns;
repSlotStat.total_bytes = rb->totalBytes;
pgstat_report_replslot(&repSlotStat); pgstat_report_replslot(&repSlotStat);
rb->spillTxns = 0; rb->spillTxns = 0;
rb->spillCount = 0; rb->spillCount = 0;
rb->spillBytes = 0; rb->spillBytes = 0;
rb->streamTxns = 0; rb->streamTxns = 0;
rb->streamCount = 0; rb->streamCount = 0;
rb->streamBytes = 0; rb->streamBytes = 0;
rb->totalTxns = 0;
rb->totalBytes = 0;
} }
...@@ -350,6 +350,8 @@ ReorderBufferAllocate(void) ...@@ -350,6 +350,8 @@ ReorderBufferAllocate(void)
buffer->streamTxns = 0; buffer->streamTxns = 0;
buffer->streamCount = 0; buffer->streamCount = 0;
buffer->streamBytes = 0; buffer->streamBytes = 0;
buffer->totalTxns = 0;
buffer->totalBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
...@@ -1363,6 +1365,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) ...@@ -1363,6 +1365,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
dlist_delete(&change->node); dlist_delete(&change->node);
dlist_push_tail(&state->old_change, &change->node); dlist_push_tail(&state->old_change, &change->node);
/*
* Update the total bytes processed before releasing the current set
* of changes and restoring the new set of changes.
*/
rb->totalBytes += rb->size;
if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
&state->entries[off].segno)) &state->entries[off].segno))
{ {
...@@ -2363,6 +2370,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, ...@@ -2363,6 +2370,20 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferIterTXNFinish(rb, iterstate); ReorderBufferIterTXNFinish(rb, iterstate);
iterstate = NULL; iterstate = NULL;
/*
* Update total transaction count and total transaction bytes
* processed. Ensure to not count the streamed transaction multiple
* times.
*
* Note that the statistics computation has to be done after
* ReorderBufferIterTXNFinish as it releases the serialized change
* which we have already accounted in ReorderBufferIterTXNNext.
*/
if (!rbtxn_is_streamed(txn))
rb->totalTxns++;
rb->totalBytes += rb->size;
/* /*
* Done with current changes, send the last message for this set of * Done with current changes, send the last message for this set of
* changes depending upon streaming mode. * changes depending upon streaming mode.
......
...@@ -2284,7 +2284,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) ...@@ -2284,7 +2284,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS) pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{ {
#define PG_STAT_GET_REPLICATION_SLOT_COLS 8 #define PG_STAT_GET_REPLICATION_SLOT_COLS 10
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc; TupleDesc tupdesc;
Tuplestorestate *tupstore; Tuplestorestate *tupstore;
...@@ -2335,11 +2335,13 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS) ...@@ -2335,11 +2335,13 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
values[4] = Int64GetDatum(s->stream_txns); values[4] = Int64GetDatum(s->stream_txns);
values[5] = Int64GetDatum(s->stream_count); values[5] = Int64GetDatum(s->stream_count);
values[6] = Int64GetDatum(s->stream_bytes); values[6] = Int64GetDatum(s->stream_bytes);
values[7] = Int64GetDatum(s->total_txns);
values[8] = Int64GetDatum(s->total_bytes);
if (s->stat_reset_timestamp == 0) if (s->stat_reset_timestamp == 0)
nulls[7] = true; nulls[9] = true;
else else
values[7] = TimestampTzGetDatum(s->stat_reset_timestamp); values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls); tuplestore_putvalues(tupstore, tupdesc, values, nulls);
} }
......
...@@ -53,6 +53,6 @@ ...@@ -53,6 +53,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 202104151 #define CATALOG_VERSION_NO 202104161
#endif #endif
...@@ -5315,9 +5315,9 @@ ...@@ -5315,9 +5315,9 @@
proname => 'pg_stat_get_replication_slots', prorows => '10', proname => 'pg_stat_get_replication_slots', prorows => '10',
proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '', prorettype => 'record', proargtypes => '',
proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}', proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
proargmodes => '{o,o,o,o,o,o,o,o}', proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}', proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' }, prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription', { oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
......
...@@ -548,6 +548,8 @@ typedef struct PgStat_MsgReplSlot ...@@ -548,6 +548,8 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_stream_txns; PgStat_Counter m_stream_txns;
PgStat_Counter m_stream_count; PgStat_Counter m_stream_count;
PgStat_Counter m_stream_bytes; PgStat_Counter m_stream_bytes;
PgStat_Counter m_total_txns;
PgStat_Counter m_total_bytes;
} PgStat_MsgReplSlot; } PgStat_MsgReplSlot;
/* ---------- /* ----------
...@@ -924,6 +926,8 @@ typedef struct PgStat_ReplSlotStats ...@@ -924,6 +926,8 @@ typedef struct PgStat_ReplSlotStats
PgStat_Counter stream_txns; PgStat_Counter stream_txns;
PgStat_Counter stream_count; PgStat_Counter stream_count;
PgStat_Counter stream_bytes; PgStat_Counter stream_bytes;
PgStat_Counter total_txns;
PgStat_Counter total_bytes;
TimestampTz stat_reset_timestamp; TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats; } PgStat_ReplSlotStats;
......
...@@ -618,6 +618,13 @@ struct ReorderBuffer ...@@ -618,6 +618,13 @@ struct ReorderBuffer
int64 streamTxns; /* number of transactions streamed */ int64 streamTxns; /* number of transactions streamed */
int64 streamCount; /* streaming invocation counter */ int64 streamCount; /* streaming invocation counter */
int64 streamBytes; /* amount of data streamed */ int64 streamBytes; /* amount of data streamed */
/*
* Statistics about all the transactions sent to the decoding output
* plugin
*/
int64 totalTxns; /* total number of transactions sent */
int64 totalBytes; /* total amount of data sent */
}; };
......
...@@ -2068,8 +2068,10 @@ pg_stat_replication_slots| SELECT s.slot_name, ...@@ -2068,8 +2068,10 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.stream_txns, s.stream_txns,
s.stream_count, s.stream_count,
s.stream_bytes, s.stream_bytes,
s.total_txns,
s.total_bytes,
s.stats_reset s.stats_reset
FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset); FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
pg_stat_slru| SELECT s.name, pg_stat_slru| SELECT s.name,
s.blks_zeroed, s.blks_zeroed,
s.blks_hit, s.blks_hit,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment