Commit 98681675 authored by Amit Kapila's avatar Amit Kapila

Track statistics for spilling of changes from ReorderBuffer.

This adds the statistics about transactions spilled to disk from
ReorderBuffer. Users can query the pg_stat_replication_slots view to check
these stats and call pg_stat_reset_replication_slot to reset the stats of
a particular slot. Users can pass NULL in pg_stat_reset_replication_slot
to reset stats of all the slots.

This commit extends the statistics collector to track this information
about slots.

Author: Sawada Masahiko and Amit Kapila
Reviewed-by: Amit Kapila and Dilip Kumar
Discussion: https://postgr.es/m/CA+fd4k5_pPAYRTDrO2PbtTOe0eHQpBvuqmCr8ic39uTNmR49Eg@mail.gmail.com
parent 8d2a01ae
......@@ -314,6 +314,15 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
</entry>
</row>
<row>
<entry><structname>pg_stat_replication_slots</structname><indexterm><primary>pg_stat_replication_slots</primary></indexterm></entry>
<entry>One row per replication slot, showing statistics about
replication slot usage.
See <link linkend="monitoring-pg-stat-replication-slots-view">
<structname>pg_stat_replication_slots</structname></link> for details.
</entry>
</row>
<row>
<entry><structname>pg_stat_wal_receiver</structname><indexterm><primary>pg_stat_wal_receiver</primary></indexterm></entry>
<entry>Only one row, showing statistics about the WAL receiver from
......@@ -2552,6 +2561,88 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
</sect2>
<sect2 id="monitoring-pg-stat-replication-slots-view">
<title><structname>pg_stat_replication_slots</structname></title>
<indexterm>
<primary>pg_stat_replication_slots</primary>
</indexterm>
<para>
The <structname>pg_stat_replication_slots</structname> view will contain
one row per logical replication slot, showing statistics about its usage.
</para>
<table id="pg-stat-replication-slots-view" xreflabel="pg_stat_replication_slots">
<title><structname>pg_stat_replication_slots</structname> View</title>
<tgroup cols="1">
<thead>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
Column Type
</para>
<para>
Description
</para></entry>
</row>
</thead>
<tbody>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>name</structfield> <type>text</type>
</para>
<para>
A unique, cluster-wide identifier for the replication slot
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>spill_txns</structfield> <type>bigint</type>
</para>
<para>
Number of transactions spilled to disk after the memory used by
logical decoding exceeds <literal>logical_decoding_work_mem</literal>. The
counter gets incremented both for toplevel transactions and
subtransactions.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>spill_count</structfield> <type>bigint</type>
</para>
<para>
Number of times transactions were spilled to disk. Transactions
may get spilled repeatedly, and this counter gets incremented on every
such invocation.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>spill_bytes</structfield> <type>bigint</type>
</para>
<para>
Amount of decoded transaction data spilled to disk.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
</para>
<para>
Time at which these statistics were last reset
</para></entry>
</row>
</tbody>
</tgroup>
</table>
</sect2>
<sect2 id="monitoring-pg-stat-wal-receiver-view">
<title><structname>pg_stat_wal_receiver</structname></title>
......@@ -4802,6 +4893,27 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
can be granted EXECUTE to run the function.
</para></entry>
</row>
<row>
<entry role="func_table_entry"><para role="func_signature">
<indexterm>
<primary>pg_stat_reset_replication_slot</primary>
</indexterm>
<function>pg_stat_reset_replication_slot</function> ( <type>text</type> )
<returnvalue>void</returnvalue>
</para>
<para>
Resets statistics to zero for a single replication slot, or for all
replication slots in the cluster. The argument can be either the name
of the slot to reset the stats or NULL. If the argument is NULL, all
counters shown in the <structname>pg_stat_replication_slots</structname>
view for all replication slots are reset.
</para>
<para>
This function is restricted to superusers by default, but other users
can be granted EXECUTE to run the function.
</para></entry>
</row>
</tbody>
</tgroup>
</table>
......
......@@ -796,6 +796,15 @@ CREATE VIEW pg_stat_replication AS
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
CREATE VIEW pg_stat_replication_slots AS
SELECT
s.name,
s.spill_txns,
s.spill_count,
s.spill_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() AS s;
CREATE VIEW pg_stat_slru AS
SELECT
s.name,
......@@ -1453,6 +1462,7 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_shared(text) FROM public;
REVOKE EXECUTE ON FUNCTION pg_stat_reset_slru(text) FROM public;
REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_table_counters(oid) FROM public;
REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM public;
REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public;
......
......@@ -51,6 +51,7 @@
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/backendid.h"
#include "storage/dsm.h"
......@@ -284,6 +285,8 @@ static PgStat_ArchiverStats archiverStats;
static PgStat_GlobalStats globalStats;
static PgStat_WalStats walStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
static PgStat_ReplSlotStats *replSlotStats;
static int nReplSlotStats;
/*
* List of OIDs of databases we need to write out. If an entry is InvalidOid,
......@@ -324,6 +327,9 @@ static void pgstat_read_current_status(void);
static bool pgstat_write_statsfile_needed(void);
static bool pgstat_db_requested(Oid databaseid);
static int pgstat_replslot_index(const char *name, bool create_it);
static void pgstat_reset_replslot(int i, TimestampTz ts);
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
static void pgstat_send_funcstats(void);
static void pgstat_send_slru(void);
......@@ -350,6 +356,7 @@ static void pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len);
static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len);
static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len);
static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);
static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
......@@ -362,6 +369,7 @@ static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len);
static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
/* ------------------------------------------------------------
......@@ -1437,6 +1445,61 @@ pgstat_reset_slru_counter(const char *name)
pgstat_send(&msg, sizeof(msg));
}
/* ----------
* pgstat_reset_replslot_counter() -
*
* Tell the statistics collector to reset a single replication slot
* counter, or all replication slots counters (when name is null).
*
* Permission checking for this function is managed through the normal
* GRANT system.
* ----------
*/
void
pgstat_reset_replslot_counter(const char *name)
{
PgStat_MsgResetreplslotcounter msg;
if (pgStatSock == PGINVALID_SOCKET)
return;
if (name)
{
ReplicationSlot *slot;
/*
* Check if the slot exits with the given name. It is possible that by
* the time this message is executed the slot is dropped but at least
* this check will ensure that the given name is for a valid slot.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
slot = SearchNamedReplicationSlot(name);
LWLockRelease(ReplicationSlotControlLock);
if (!slot)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("replication slot \"%s\" does not exist",
name)));
/*
* Nothing to do for physical slots as we collect stats only for
* logical slots.
*/
if (SlotIsPhysical(slot))
return;
memcpy(&msg.m_slotname, name, NAMEDATALEN);
msg.clearall = false;
}
else
msg.clearall = true;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER);
pgstat_send(&msg, sizeof(msg));
}
/* ----------
* pgstat_report_autovac() -
*
......@@ -1637,6 +1700,46 @@ pgstat_report_tempfile(size_t filesize)
pgstat_send(&msg, sizeof(msg));
}
/* ----------
* pgstat_report_replslot() -
*
* Tell the collector about replication slot statistics.
* ----------
*/
void
pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
int spillbytes)
{
PgStat_MsgReplSlot msg;
/*
* Prepare and send the message
*/
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
msg.m_drop = false;
msg.m_spill_txns = spilltxns;
msg.m_spill_count = spillcount;
msg.m_spill_bytes = spillbytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
/* ----------
* pgstat_report_replslot_drop() -
*
* Tell the collector about dropping the replication slot.
* ----------
*/
void
pgstat_report_replslot_drop(const char *slotname)
{
PgStat_MsgReplSlot msg;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
msg.m_drop = true;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
/* ----------
* pgstat_ping() -
......@@ -2714,6 +2817,23 @@ pgstat_fetch_slru(void)
return slruStats;
}
/*
* ---------
* pgstat_fetch_replslot() -
*
* Support function for the SQL-callable pgstat* functions. Returns
* a pointer to the replication slot statistics struct and sets the
* number of entries in nslots_p.
* ---------
*/
PgStat_ReplSlotStats *
pgstat_fetch_replslot(int *nslots_p)
{
backend_read_statsfile();
*nslots_p = nReplSlotStats;
return replSlotStats;
}
/* ------------------------------------------------------------
* Functions for management of the shared-memory PgBackendStatus array
......@@ -4693,6 +4813,11 @@ PgstatCollectorMain(int argc, char *argv[])
len);
break;
case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
len);
break;
case PGSTAT_MTYPE_AUTOVAC_START:
pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
break;
......@@ -4747,6 +4872,10 @@ PgstatCollectorMain(int argc, char *argv[])
len);
break;
case PGSTAT_MTYPE_REPLSLOT:
pgstat_recv_replslot(&msg.msg_replslot, len);
break;
default:
break;
}
......@@ -4946,6 +5075,7 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
int rc;
int i;
elog(DEBUG2, "writing stats file \"%s\"", statfile);
......@@ -5025,6 +5155,16 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
(void) rc; /* we'll check for error with ferror */
}
/*
* Write replication slot stats struct
*/
for (i = 0; i < nReplSlotStats; i++)
{
fputc('R', fpout);
rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
(void) rc; /* we'll check for error with ferror */
}
/*
* No more output to be done. Close the temp file and replace the old
* pgstat.stat with it. The ferror() check replaces testing for error
......@@ -5250,6 +5390,10 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Allocate the space for replication slot statistics */
replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats));
nReplSlotStats = 0;
/*
* Clear out global, archiver, WAL and SLRU statistics so they start from
* zero in case we can't load an existing statsfile.
......@@ -5273,6 +5417,12 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
/*
* Set the same reset timestamp for all replication slots too.
*/
for (i = 0; i < max_replication_slots; i++)
replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
/*
* Try to open the stats file. If it doesn't exist, the backends simply
* return zero for anything and the collector simply starts from scratch
......@@ -5447,6 +5597,23 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
break;
/*
* 'R' A PgStat_ReplSlotStats struct describing a replication
* slot follows.
*/
case 'R':
if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
!= sizeof(PgStat_ReplSlotStats))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"",
statfile)));
memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
goto done;
}
nReplSlotStats++;
break;
case 'E':
goto done;
......@@ -5658,6 +5825,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_ArchiverStats myArchiverStats;
PgStat_WalStats myWalStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
PgStat_ReplSlotStats myReplSlotStats;
FILE *fpin;
int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
......@@ -5772,6 +5940,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
break;
/*
* 'R' A PgStat_ReplSlotStats struct describing a replication
* slot follows.
*/
case 'R':
if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
!= sizeof(PgStat_ReplSlotStats))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"",
statfile)));
FreeFile(fpin);
return false;
}
break;
case 'E':
goto done;
......@@ -6367,6 +6551,46 @@ pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len)
}
}
/* ----------
* pgstat_recv_resetreplslotcounter() -
*
* Reset some replication slot statistics of the cluster.
* ----------
*/
static void
pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
int len)
{
int i;
int idx = -1;
TimestampTz ts;
ts = GetCurrentTimestamp();
if (msg->clearall)
{
for (i = 0; i < nReplSlotStats; i++)
pgstat_reset_replslot(i, ts);
}
else
{
/* Get the index of replication slot statistics to reset */
idx = pgstat_replslot_index(msg->m_slotname, false);
/*
* Nothing to do if the given slot entry is not found. This could
* happen when the slot with the given name is removed and the
* corresponding statistics entry is also removed before receiving the
* reset message.
*/
if (idx < 0)
return;
/* Reset the stats for the requested replication slot */
pgstat_reset_replslot(idx, ts);
}
}
/* ----------
* pgstat_recv_autovac() -
*
......@@ -6626,6 +6850,51 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
dbentry->last_checksum_failure = msg->m_failure_time;
}
/* ----------
* pgstat_recv_replslot() -
*
* Process a REPLSLOT message.
* ----------
*/
static void
pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
{
int idx;
/*
* Get the index of replication slot statistics. On dropping, we don't
* create the new statistics.
*/
idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop);
/*
* The slot entry is not found or there is no space to accommodate the new
* entry. This could happen when the message for the creation of a slot
* reached before the drop message even though the actual operations
* happen in reverse order. In such a case, the next update of the
* statistics for the same slot will create the required entry.
*/
if (idx < 0)
return;
Assert(idx >= 0 && idx <= max_replication_slots);
if (msg->m_drop)
{
/* Remove the replication slot statistics with the given name */
memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
sizeof(PgStat_ReplSlotStats));
nReplSlotStats--;
Assert(nReplSlotStats >= 0);
}
else
{
/* Update the replication slot statistics */
replSlotStats[idx].spill_txns += msg->m_spill_txns;
replSlotStats[idx].spill_count += msg->m_spill_count;
replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
}
}
/* ----------
* pgstat_recv_tempfile() -
*
......@@ -6808,6 +7077,57 @@ pgstat_clip_activity(const char *raw_activity)
return activity;
}
/* ----------
* pgstat_replslot_index
*
* Return the index of entry of a replication slot with the given name, or
* -1 if the slot is not found.
*
* create_it tells whether to create the new slot entry if it is not found.
* ----------
*/
static int
pgstat_replslot_index(const char *name, bool create_it)
{
int i;
Assert(nReplSlotStats <= max_replication_slots);
for (i = 0; i < nReplSlotStats; i++)
{
if (strcmp(replSlotStats[i].slotname, name) == 0)
return i; /* found */
}
/*
* The slot is not found. We don't want to register the new statistics if
* the list is already full or the caller didn't request.
*/
if (i == max_replication_slots || !create_it)
return -1;
/* Register new slot */
memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN);
return nReplSlotStats++;
}
/* ----------
* pgstat_reset_replslot
*
* Reset the replication slot stats at index 'i'.
* ----------
*/
static void
pgstat_reset_replslot(int i, TimestampTz ts)
{
/* reset only counters. Don't clear slot name */
replSlotStats[i].spill_txns = 0;
replSlotStats[i].spill_count = 0;
replSlotStats[i].spill_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts;
}
/*
* pgstat_slru_index
*
......
......@@ -650,6 +650,12 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
commit_time, origin_id, origin_lsn);
/*
* Update the decoding stats at transaction commit/abort. It is not clear
* that sending more or less frequently than this would be better.
*/
UpdateDecodingStats(ctx);
}
/*
......@@ -669,6 +675,9 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
/* update the decoding stats */
UpdateDecodingStats(ctx);
}
/*
......
......@@ -32,6 +32,7 @@
#include "access/xlog_internal.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/origin.h"
......@@ -1460,3 +1461,31 @@ ResetLogicalStreamingState(void)
CheckXidAlive = InvalidTransactionId;
bsysscan = false;
}
/*
* Report stats for a slot.
*/
void
UpdateDecodingStats(LogicalDecodingContext *ctx)
{
ReorderBuffer *rb = ctx->reorder;
/*
* Nothing to do if we haven't spilled anything since the last time the
* stats has been sent.
*/
if (rb->spillBytes <= 0)
return;
elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
rb,
(long long) rb->spillTxns,
(long long) rb->spillCount,
(long long) rb->spillBytes);
pgstat_report_replslot(NameStr(ctx->slot->data.name),
rb->spillTxns, rb->spillCount, rb->spillBytes);
rb->spillTxns = 0;
rb->spillCount = 0;
rb->spillBytes = 0;
}
......@@ -343,6 +343,10 @@ ReorderBufferAllocate(void)
buffer->outbufsize = 0;
buffer->size = 0;
buffer->spillTxns = 0;
buffer->spillCount = 0;
buffer->spillBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
dlist_init(&buffer->toplevel_by_lsn);
......@@ -1579,6 +1583,13 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
ReorderBufferRestoreCleanup(rb, txn);
txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
/*
* We set this flag to indicate if the transaction is ever serialized.
* We need this to accurately update the stats as otherwise the same
* transaction can be counted as serialized multiple times.
*/
txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
}
/* also reset the number of entries in the transaction */
......@@ -3112,6 +3123,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
int fd = -1;
XLogSegNo curOpenSegNo = 0;
Size spilled = 0;
Size size = txn->size;
elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid);
......@@ -3170,6 +3182,16 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
spilled++;
}
/* update the statistics iff we have spilled anything */
if (spilled)
{
rb->spillCount += 1;
rb->spillBytes += size;
/* don't consider already serialized transactions */
rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
}
Assert(spilled == txn->nentries_mem);
Assert(dlist_is_empty(&txn->changes));
txn->nentries_mem = 0;
......
......@@ -99,7 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 0; /* the maximum number of replication
* slots */
static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
const char *name, SlotAcquireBehavior behavior);
static void ReplicationSlotDropAcquired(void);
......@@ -314,6 +313,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
LWLockRelease(ReplicationSlotControlLock);
/*
* Create statistics entry for the new logical slot. We don't collect any
* stats for physical slots, so no need to create an entry for the same.
* See ReplicationSlotDropPtr for why we need to do this before releasing
* ReplicationSlotAllocationLock.
*/
if (SlotIsLogical(slot))
pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
/*
* Now that the slot has been marked as in_use and active, it's safe to
* let somebody else try to allocate a slot.
......@@ -331,7 +339,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
*
* The caller must hold ReplicationSlotControlLock in shared mode.
*/
static ReplicationSlot *
ReplicationSlot *
SearchNamedReplicationSlot(const char *name)
{
int i;
......@@ -683,6 +691,19 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
ereport(WARNING,
(errmsg("could not remove directory \"%s\"", tmppath)));
/*
* Send a message to drop the replication slot to the stats collector.
* Since there is no guarantee of the order of message transfer on a UDP
* connection, it's possible that a message for creating a new slot
* reaches before a message for removing the old slot. We send the drop
* and create messages while holding ReplicationSlotAllocationLock to
* reduce that possibility. If the messages reached in reverse, we would
* lose one statistics update message. But the next update message will
* create the statistics for the replication slot.
*/
if (SlotIsLogical(slot))
pgstat_report_replslot_drop(NameStr(slot->data.name));
/*
* We release this at the very end, so that nobody starts trying to create
* a slot while we're still cleaning up the detritus of the old one.
......
......@@ -2069,6 +2069,20 @@ pg_stat_reset_slru(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/* Reset replication slots stats (a specific one or all of them). */
Datum
pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
{
char *target = NULL;
if (!PG_ARGISNULL(0))
target = text_to_cstring(PG_GETARG_TEXT_PP(0));
pgstat_reset_replslot_counter(target);
PG_RETURN_VOID();
}
Datum
pg_stat_get_archiver(PG_FUNCTION_ARGS)
{
......@@ -2134,3 +2148,69 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
/* Get the statistics for the replication slots */
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_REPLICATION_SLOT_CLOS 5
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
PgStat_ReplSlotStats *slotstats;
int nstats;
int i;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not allowed in this context")));
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
slotstats = pgstat_fetch_replslot(&nstats);
for (i = 0; i < nstats; i++)
{
Datum values[PG_STAT_GET_REPLICATION_SLOT_CLOS];
bool nulls[PG_STAT_GET_REPLICATION_SLOT_CLOS];
PgStat_ReplSlotStats *s = &(slotstats[i]);
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
values[0] = PointerGetDatum(cstring_to_text(s->slotname));
values[1] = Int64GetDatum(s->spill_txns);
values[2] = Int64GetDatum(s->spill_count);
values[3] = Int64GetDatum(s->spill_bytes);
if (s->stat_reset_timestamp == 0)
nulls[4] = true;
else
values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
tuplestore_donestoring(tupstore);
return (Datum) 0;
}
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202010021
#define CATALOG_VERSION_NO 202010081
#endif
......@@ -5257,6 +5257,14 @@
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
prosrc => 'pg_stat_get_wal_receiver' },
{ oid => '8595', descr => 'statistics: information about replication slots',
proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
proallargtypes => '{text,int8,int8,int8,timestamptz}',
proargmodes => '{o,o,o,o,o}',
proargnames => '{name,spill_txns,spill_count,spill_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
......@@ -5606,6 +5614,10 @@
descr => 'statistics: reset collected statistics for a single SLRU',
proname => 'pg_stat_reset_slru', proisstrict => 'f', provolatile => 'v',
prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_slru' },
{ oid => '8596',
descr => 'statistics: reset collected statistics for a single replication slot',
proname => 'pg_stat_reset_replication_slot', proisstrict => 'f', provolatile => 'v',
prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_replication_slot' },
{ oid => '3163', descr => 'current trigger depth',
proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
......
......@@ -56,6 +56,7 @@ typedef enum StatMsgType
PGSTAT_MTYPE_RESETSHAREDCOUNTER,
PGSTAT_MTYPE_RESETSINGLECOUNTER,
PGSTAT_MTYPE_RESETSLRUCOUNTER,
PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
PGSTAT_MTYPE_AUTOVAC_START,
PGSTAT_MTYPE_VACUUM,
PGSTAT_MTYPE_ANALYZE,
......@@ -68,7 +69,8 @@ typedef enum StatMsgType
PGSTAT_MTYPE_RECOVERYCONFLICT,
PGSTAT_MTYPE_TEMPFILE,
PGSTAT_MTYPE_DEADLOCK,
PGSTAT_MTYPE_CHECKSUMFAILURE
PGSTAT_MTYPE_CHECKSUMFAILURE,
PGSTAT_MTYPE_REPLSLOT,
} StatMsgType;
/* ----------
......@@ -358,6 +360,18 @@ typedef struct PgStat_MsgResetslrucounter
int m_index;
} PgStat_MsgResetslrucounter;
/* ----------
* PgStat_MsgResetreplslotcounter Sent by the backend to tell the collector
* to reset replication slot counter(s)
* ----------
*/
typedef struct PgStat_MsgResetreplslotcounter
{
PgStat_MsgHdr m_hdr;
char m_slotname[NAMEDATALEN];
bool clearall;
} PgStat_MsgResetreplslotcounter;
/* ----------
* PgStat_MsgAutovacStart Sent by the autovacuum daemon to signal
* that a database is going to be processed
......@@ -465,6 +479,22 @@ typedef struct PgStat_MsgSLRU
PgStat_Counter m_truncate;
} PgStat_MsgSLRU;
/* ----------
* PgStat_MsgReplSlot Sent by a backend or a wal sender to update replication
* slot statistics.
* ----------
*/
typedef struct PgStat_MsgReplSlot
{
PgStat_MsgHdr m_hdr;
char m_slotname[NAMEDATALEN];
bool m_drop;
PgStat_Counter m_spill_txns;
PgStat_Counter m_spill_count;
PgStat_Counter m_spill_bytes;
} PgStat_MsgReplSlot;
/* ----------
* PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict
* ----------
......@@ -603,6 +633,7 @@ typedef union PgStat_Msg
PgStat_MsgResetsharedcounter msg_resetsharedcounter;
PgStat_MsgResetsinglecounter msg_resetsinglecounter;
PgStat_MsgResetslrucounter msg_resetslrucounter;
PgStat_MsgResetreplslotcounter msg_resetreplslotcounter;
PgStat_MsgAutovacStart msg_autovacuum_start;
PgStat_MsgVacuum msg_vacuum;
PgStat_MsgAnalyze msg_analyze;
......@@ -616,6 +647,7 @@ typedef union PgStat_Msg
PgStat_MsgDeadlock msg_deadlock;
PgStat_MsgTempFile msg_tempfile;
PgStat_MsgChecksumFailure msg_checksumfailure;
PgStat_MsgReplSlot msg_replslot;
} PgStat_Msg;
......@@ -627,7 +659,7 @@ typedef union PgStat_Msg
* ------------------------------------------------------------
*/
#define PGSTAT_FILE_FORMAT_ID 0x01A5BC9E
#define PGSTAT_FILE_FORMAT_ID 0x01A5BC9F
/* ----------
* PgStat_StatDBEntry The collector's data per database
......@@ -782,6 +814,17 @@ typedef struct PgStat_SLRUStats
TimestampTz stat_reset_timestamp;
} PgStat_SLRUStats;
/*
* Replication slot statistics kept in the stats collector
*/
typedef struct PgStat_ReplSlotStats
{
char slotname[NAMEDATALEN];
PgStat_Counter spill_txns;
PgStat_Counter spill_count;
PgStat_Counter spill_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats;
/* ----------
* Backend states
......@@ -1330,6 +1373,7 @@ extern void pgstat_reset_counters(void);
extern void pgstat_reset_shared_counters(const char *);
extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
extern void pgstat_reset_slru_counter(const char *);
extern void pgstat_reset_replslot_counter(const char *name);
extern void pgstat_report_autovac(Oid dboid);
extern void pgstat_report_vacuum(Oid tableoid, bool shared,
......@@ -1342,6 +1386,9 @@ extern void pgstat_report_recovery_conflict(int reason);
extern void pgstat_report_deadlock(void);
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
int spillbytes);
extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
extern void pgstat_bestart(void);
......@@ -1508,6 +1555,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
extern PgStat_GlobalStats *pgstat_fetch_global(void);
extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
extern PgStat_SLRUStats *pgstat_fetch_slru(void);
extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
extern void pgstat_count_slru_page_zeroed(int slru_idx);
extern void pgstat_count_slru_page_hit(int slru_idx);
......
......@@ -122,5 +122,6 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
#endif
......@@ -162,9 +162,10 @@ typedef struct ReorderBufferChange
#define RBTXN_HAS_CATALOG_CHANGES 0x0001
#define RBTXN_IS_SUBXACT 0x0002
#define RBTXN_IS_SERIALIZED 0x0004
#define RBTXN_IS_STREAMED 0x0008
#define RBTXN_HAS_TOAST_INSERT 0x0010
#define RBTXN_HAS_SPEC_INSERT 0x0020
#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
#define RBTXN_IS_STREAMED 0x0010
#define RBTXN_HAS_TOAST_INSERT 0x0020
#define RBTXN_HAS_SPEC_INSERT 0x0040
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
......@@ -184,6 +185,12 @@ typedef struct ReorderBufferChange
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
/* Has this transaction ever been spilled to disk? */
#define rbtxn_is_serialized_clear(txn) \
( \
((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
)
/* This transaction's changes has toast insert, without main table insert. */
#define rbtxn_has_toast_insert(txn) \
( \
......@@ -525,6 +532,17 @@ struct ReorderBuffer
/* memory accounting */
Size size;
/*
* Statistics about transactions spilled to disk.
*
* A single transaction may be spilled repeatedly, which is why we keep
* two different counters. For spilling, the transaction counter includes
* both toplevel transactions and subtransactions.
*/
int64 spillTxns; /* number of transactions spilled to disk */
int64 spillCount; /* spill-to-disk invocation counter */
int64 spillBytes; /* amount of data spilled to disk */
};
......
......@@ -210,6 +210,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void);
......
......@@ -2018,6 +2018,12 @@ pg_stat_replication| SELECT s.pid,
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
pg_stat_replication_slots| SELECT s.name,
s.spill_txns,
s.spill_count,
s.spill_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stats_reset);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,
......
......@@ -1832,7 +1832,9 @@ PgStat_MsgFuncstat
PgStat_MsgHdr
PgStat_MsgInquiry
PgStat_MsgRecoveryConflict
PgStat_MsgReplSlot
PgStat_MsgResetcounter
PgStat_MsgResetreplslotcounter
PgStat_MsgResetsharedcounter
PgStat_MsgResetsinglecounter
PgStat_MsgResetslrucounter
......@@ -1842,6 +1844,7 @@ PgStat_MsgTabstat
PgStat_MsgTempFile
PgStat_MsgVacuum
PgStat_MsgWal
PgStat_ReplSlotStats
PgStat_SLRUStats
PgStat_Shared_Reset_Target
PgStat_Single_Reset_Type
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment