Commit 3fa17d37 authored by Amit Kapila's avatar Amit Kapila

Use HTAB for replication slot statistics.

Previously, we used to use the array of size max_replication_slots to
store stats for replication slots. But that had two problems in the cases
where a message for dropping a slot gets lost: 1) the stats for the new
slot are not recorded if the array is full and 2) writing beyond the end
of the array if the user reduces the max_replication_slots.

This commit uses HTAB for replication slot statistics, resolving both
problems. Now, pgstat_vacuum_stat() search for all the dead replication
slots in stats hashtable and tell the collector to remove them. To avoid
showing the stats for the already-dropped slots, pg_stat_replication_slots
view searches slot stats by the slot name taken from pg_replication_slots.

Also, we send a message for creating a slot at slot creation, initializing
the stats. This reduces the possibility that the stats are accumulated
into the old slot stats when a message for dropping a slot gets lost.

Reported-by: Andres Freund
Author: Sawada Masahiko, test case by Vignesh C
Reviewed-by: Amit Kapila, Vignesh C, Dilip Kumar
Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de
parent e7eea52b
......@@ -2,9 +2,10 @@
# drop replication slot and restart.
use strict;
use warnings;
use File::Path qw(rmtree);
use PostgresNode;
use TestLib;
use Test::More tests => 1;
use Test::More tests => 2;
# Test set-up
my $node = get_new_node('test');
......@@ -12,9 +13,22 @@ $node->init(allows_streaming => 'logical');
$node->append_conf('postgresql.conf', 'synchronous_commit = on');
$node->start;
# Check that replication slot stats are expected.
sub test_slot_stats
{
my ($node, $expected, $msg) = @_;
my $result = $node->safe_psql(
'postgres', qq[
SELECT slot_name, total_txns > 0 AS total_txn,
total_bytes > 0 AS total_bytes
FROM pg_stat_replication_slots
ORDER BY slot_name]);
is($result, $expected, $msg);
}
# Create table.
$node->safe_psql('postgres',
"CREATE TABLE test_repl_stat(col1 int)");
$node->safe_psql('postgres', "CREATE TABLE test_repl_stat(col1 int)");
# Create replication slots.
$node->safe_psql(
......@@ -26,7 +40,8 @@ $node->safe_psql(
]);
# Insert some data.
$node->safe_psql('postgres', "INSERT INTO test_repl_stat values(generate_series(1, 5));");
$node->safe_psql('postgres',
"INSERT INTO test_repl_stat values(generate_series(1, 5));");
$node->safe_psql(
'postgres', qq[
......@@ -50,27 +65,51 @@ $node->poll_query_until(
# Test to drop one of the replication slot and verify replication statistics data is
# fine after restart.
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot4')");
$node->safe_psql('postgres',
"SELECT pg_drop_replication_slot('regression_slot4')");
$node->stop;
$node->start;
# Verify statistics data present in pg_stat_replication_slots are sane after
# restart.
my $result = $node->safe_psql('postgres',
"SELECT slot_name, total_txns > 0 AS total_txn,
total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots
ORDER BY slot_name"
);
is($result, qq(regression_slot1|t|t
test_slot_stats(
$node,
qq(regression_slot1|t|t
regression_slot2|t|t
regression_slot3|t|t), 'check replication statistics are updated');
regression_slot3|t|t),
'check replication statistics are updated');
# Test to remove one of the replication slots and adjust
# max_replication_slots accordingly to the number of slots. This leads
# to a mismatch between the number of slots present in the stats file and the
# number of stats present in the shared memory, simulating the scenario for
# drop slot message lost by the statistics collector process. We verify
# replication statistics data is fine after restart.
$node->stop;
my $datadir = $node->data_dir;
my $slot3_replslotdir = "$datadir/pg_replslot/regression_slot3";
rmtree($slot3_replslotdir);
$node->append_conf('postgresql.conf', 'max_replication_slots = 2');
$node->start;
# Verify statistics data present in pg_stat_replication_slots are sane after
# restart.
test_slot_stats(
$node,
qq(regression_slot1|t|t
regression_slot2|t|t),
'check replication statistics after removing the slot file');
# cleanup
$node->safe_psql('postgres', "DROP TABLE test_repl_stat");
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot1')");
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot2')");
$node->safe_psql('postgres', "SELECT pg_drop_replication_slot('regression_slot3')");
$node->safe_psql('postgres',
"SELECT pg_drop_replication_slot('regression_slot1')");
$node->safe_psql('postgres',
"SELECT pg_drop_replication_slot('regression_slot2')");
# shutdown
$node->stop;
......@@ -866,20 +866,6 @@ CREATE VIEW pg_stat_replication AS
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
CREATE VIEW pg_stat_replication_slots AS
SELECT
s.slot_name,
s.spill_txns,
s.spill_count,
s.spill_bytes,
s.stream_txns,
s.stream_count,
s.stream_bytes,
s.total_txns,
s.total_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() AS s;
CREATE VIEW pg_stat_slru AS
SELECT
s.name,
......@@ -984,6 +970,22 @@ CREATE VIEW pg_replication_slots AS
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
CREATE VIEW pg_stat_replication_slots AS
SELECT
s.slot_name,
s.spill_txns,
s.spill_count,
s.spill_bytes,
s.stream_txns,
s.stream_count,
s.stream_bytes,
s.total_txns,
s.total_bytes,
s.stats_reset
FROM pg_replication_slots as r,
LATERAL pg_stat_get_replication_slot(slot_name) as s
WHERE r.datoid IS NOT NULL; -- excluding physical slots
CREATE VIEW pg_stat_database AS
SELECT
D.oid AS datid,
......
......@@ -106,6 +106,7 @@
#define PGSTAT_DB_HASH_SIZE 16
#define PGSTAT_TAB_HASH_SIZE 512
#define PGSTAT_FUNCTION_HASH_SIZE 512
#define PGSTAT_REPLSLOT_HASH_SIZE 32
/* ----------
......@@ -278,8 +279,7 @@ static PgStat_ArchiverStats archiverStats;
static PgStat_GlobalStats globalStats;
static PgStat_WalStats walStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
static PgStat_ReplSlotStats *replSlotStats;
static int nReplSlotStats;
static HTAB *replSlotStatHash = NULL;
static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
/*
......@@ -319,8 +319,8 @@ static void backend_read_statsfile(void);
static bool pgstat_write_statsfile_needed(void);
static bool pgstat_db_requested(Oid databaseid);
static int pgstat_replslot_index(const char *name, bool create_it);
static void pgstat_reset_replslot(int i, TimestampTz ts);
static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it);
static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts);
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
static void pgstat_send_funcstats(void);
......@@ -1109,6 +1109,24 @@ pgstat_vacuum_stat(void)
/* Clean up */
hash_destroy(htab);
/*
* Search for all the dead replication slots in stats hashtable and tell
* the stats collector to drop them.
*/
if (replSlotStatHash)
{
PgStat_StatReplSlotEntry *slotentry;
hash_seq_init(&hstat, replSlotStatHash);
while ((slotentry = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
{
CHECK_FOR_INTERRUPTS();
if (SearchNamedReplicationSlot(NameStr(slotentry->slotname), true) == NULL)
pgstat_report_replslot_drop(NameStr(slotentry->slotname));
}
}
/*
* Lookup our own database entry; if not found, nothing more to do.
*/
......@@ -1516,30 +1534,6 @@ pgstat_reset_replslot_counter(const char *name)
if (name)
{
ReplicationSlot *slot;
/*
* Check if the slot exists with the given name. It is possible that by
* the time this message is executed the slot is dropped but at least
* this check will ensure that the given name is for a valid slot.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
slot = SearchNamedReplicationSlot(name);
LWLockRelease(ReplicationSlotControlLock);
if (!slot)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("replication slot \"%s\" does not exist",
name)));
/*
* Nothing to do for physical slots as we collect stats only for
* logical slots.
*/
if (SlotIsPhysical(slot))
return;
namestrcpy(&msg.m_slotname, name);
msg.clearall = false;
}
......@@ -1813,7 +1807,7 @@ pgstat_report_tempfile(size_t filesize)
* ----------
*/
void
pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat)
{
PgStat_MsgReplSlot msg;
......@@ -1822,6 +1816,7 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
*/
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname));
msg.m_create = false;
msg.m_drop = false;
msg.m_spill_txns = repSlotStat->spill_txns;
msg.m_spill_count = repSlotStat->spill_count;
......@@ -1834,6 +1829,24 @@ pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat)
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
/* ----------
* pgstat_report_replslot_create() -
*
* Tell the collector about creating the replication slot.
* ----------
*/
void
pgstat_report_replslot_create(const char *slotname)
{
PgStat_MsgReplSlot msg;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
namestrcpy(&msg.m_slotname, slotname);
msg.m_create = true;
msg.m_drop = false;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
/* ----------
* pgstat_report_replslot_drop() -
*
......@@ -1847,6 +1860,7 @@ pgstat_report_replslot_drop(const char *slotname)
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
namestrcpy(&msg.m_slotname, slotname);
msg.m_create = false;
msg.m_drop = true;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
......@@ -2872,17 +2886,15 @@ pgstat_fetch_slru(void)
* pgstat_fetch_replslot() -
*
* Support function for the SQL-callable pgstat* functions. Returns
* a pointer to the replication slot statistics struct and sets the
* number of entries in nslots_p.
* a pointer to the replication slot statistics struct.
* ---------
*/
PgStat_ReplSlotStats *
pgstat_fetch_replslot(int *nslots_p)
PgStat_StatReplSlotEntry *
pgstat_fetch_replslot(NameData slotname)
{
backend_read_statsfile();
*nslots_p = nReplSlotStats;
return replSlotStats;
return pgstat_get_replslot_entry(slotname, false);
}
/*
......@@ -3654,7 +3666,6 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
int rc;
int i;
elog(DEBUG2, "writing stats file \"%s\"", statfile);
......@@ -3744,11 +3755,17 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
/*
* Write replication slot stats struct
*/
for (i = 0; i < nReplSlotStats; i++)
if (replSlotStatHash)
{
fputc('R', fpout);
rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
(void) rc; /* we'll check for error with ferror */
PgStat_StatReplSlotEntry *slotent;
hash_seq_init(&hstat, replSlotStatHash);
while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&hstat)) != NULL)
{
fputc('R', fpout);
rc = fwrite(slotent, sizeof(PgStat_StatReplSlotEntry), 1, fpout);
(void) rc; /* we'll check for error with ferror */
}
}
/*
......@@ -3975,12 +3992,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Allocate the space for replication slot statistics */
replSlotStats = MemoryContextAllocZero(pgStatLocalContext,
max_replication_slots
* sizeof(PgStat_ReplSlotStats));
nReplSlotStats = 0;
/*
* Clear out global, archiver, WAL and SLRU statistics so they start from
* zero in case we can't load an existing statsfile.
......@@ -4005,12 +4016,6 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
/*
* Set the same reset timestamp for all replication slots too.
*/
for (i = 0; i < max_replication_slots; i++)
replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
/*
* Try to open the stats file. If it doesn't exist, the backends simply
* return zero for anything and the collector simply starts from scratch
......@@ -4197,21 +4202,43 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
break;
/*
* 'R' A PgStat_ReplSlotStats struct describing a replication
* slot follows.
* 'R' A PgStat_StatReplSlotEntry struct describing a
* replication slot follows.
*/
case 'R':
if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
!= sizeof(PgStat_ReplSlotStats))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"",
statfile)));
memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
goto done;
PgStat_StatReplSlotEntry slotbuf;
PgStat_StatReplSlotEntry *slotent;
if (fread(&slotbuf, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
!= sizeof(PgStat_StatReplSlotEntry))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"",
statfile)));
goto done;
}
/* Create hash table if we don't have it already. */
if (replSlotStatHash == NULL)
{
HASHCTL hash_ctl;
hash_ctl.keysize = sizeof(NameData);
hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
hash_ctl.hcxt = pgStatLocalContext;
replSlotStatHash = hash_create("Replication slots hash",
PGSTAT_REPLSLOT_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
}
slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
(void *) &slotbuf.slotname,
HASH_ENTER, NULL);
memcpy(slotent, &slotbuf, sizeof(PgStat_StatReplSlotEntry));
break;
}
nReplSlotStats++;
break;
case 'E':
goto done;
......@@ -4424,7 +4451,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_ArchiverStats myArchiverStats;
PgStat_WalStats myWalStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
PgStat_ReplSlotStats myReplSlotStats;
PgStat_StatReplSlotEntry myReplSlotStats;
PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
FILE *fpin;
int32 format_id;
......@@ -4553,12 +4580,12 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
break;
/*
* 'R' A PgStat_ReplSlotStats struct describing a replication
* slot follows.
* 'R' A PgStat_StatReplSlotEntry struct describing a
* replication slot follows.
*/
case 'R':
if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
!= sizeof(PgStat_ReplSlotStats))
if (fread(&myReplSlotStats, 1, sizeof(PgStat_StatReplSlotEntry), fpin)
!= sizeof(PgStat_StatReplSlotEntry))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"",
......@@ -4764,8 +4791,7 @@ pgstat_clear_snapshot(void)
/* Reset variables */
pgStatLocalContext = NULL;
pgStatDBHash = NULL;
replSlotStats = NULL;
nReplSlotStats = 0;
replSlotStatHash = NULL;
/*
* Historically the backend_status.c facilities lived in this file, and
......@@ -5189,20 +5215,26 @@ static void
pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
int len)
{
int i;
int idx = -1;
PgStat_StatReplSlotEntry *slotent;
TimestampTz ts;
/* Return if we don't have replication slot statistics */
if (replSlotStatHash == NULL)
return;
ts = GetCurrentTimestamp();
if (msg->clearall)
{
for (i = 0; i < nReplSlotStats; i++)
pgstat_reset_replslot(i, ts);
HASH_SEQ_STATUS sstat;
hash_seq_init(&sstat, replSlotStatHash);
while ((slotent = (PgStat_StatReplSlotEntry *) hash_seq_search(&sstat)) != NULL)
pgstat_reset_replslot(slotent, ts);
}
else
{
/* Get the index of replication slot statistics to reset */
idx = pgstat_replslot_index(NameStr(msg->m_slotname), false);
/* Get the slot statistics to reset */
slotent = pgstat_get_replslot_entry(msg->m_slotname, false);
/*
* Nothing to do if the given slot entry is not found. This could
......@@ -5210,11 +5242,11 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
* corresponding statistics entry is also removed before receiving the
* reset message.
*/
if (idx < 0)
if (!slotent)
return;
/* Reset the stats for the requested replication slot */
pgstat_reset_replslot(idx, ts);
pgstat_reset_replslot(slotent, ts);
}
}
......@@ -5532,46 +5564,45 @@ pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len)
static void
pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
{
int idx;
/*
* Get the index of replication slot statistics. On dropping, we don't
* create the new statistics.
*/
idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop);
/*
* The slot entry is not found or there is no space to accommodate the new
* entry. This could happen when the message for the creation of a slot
* reached before the drop message even though the actual operations
* happen in reverse order. In such a case, the next update of the
* statistics for the same slot will create the required entry.
*/
if (idx < 0)
return;
/* it must be a valid replication slot index */
Assert(idx < nReplSlotStats);
if (msg->m_drop)
{
Assert(!msg->m_create);
/* Remove the replication slot statistics with the given name */
if (idx < nReplSlotStats - 1)
memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
sizeof(PgStat_ReplSlotStats));
nReplSlotStats--;
if (replSlotStatHash != NULL)
(void) hash_search(replSlotStatHash,
(void *) &(msg->m_slotname),
HASH_REMOVE,
NULL);
}
else
{
/* Update the replication slot statistics */
replSlotStats[idx].spill_txns += msg->m_spill_txns;
replSlotStats[idx].spill_count += msg->m_spill_count;
replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
replSlotStats[idx].stream_txns += msg->m_stream_txns;
replSlotStats[idx].stream_count += msg->m_stream_count;
replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
replSlotStats[idx].total_txns += msg->m_total_txns;
replSlotStats[idx].total_bytes += msg->m_total_bytes;
PgStat_StatReplSlotEntry *slotent;
slotent = pgstat_get_replslot_entry(msg->m_slotname, true);
Assert(slotent);
if (msg->m_create)
{
/*
* If the message for dropping the slot with the same name gets
* lost, slotent has stats for the old slot. So we initialize all
* counters at slot creation.
*/
pgstat_reset_replslot(slotent, 0);
}
else
{
/* Update the replication slot statistics */
slotent->spill_txns += msg->m_spill_txns;
slotent->spill_count += msg->m_spill_count;
slotent->spill_bytes += msg->m_spill_bytes;
slotent->stream_txns += msg->m_stream_txns;
slotent->stream_count += msg->m_stream_count;
slotent->stream_bytes += msg->m_stream_bytes;
slotent->total_txns += msg->m_total_txns;
slotent->total_bytes += msg->m_total_bytes;
}
}
}
......@@ -5749,59 +5780,80 @@ pgstat_db_requested(Oid databaseid)
}
/* ----------
* pgstat_replslot_index
* pgstat_replslot_entry
*
* Return the index of entry of a replication slot with the given name, or
* -1 if the slot is not found.
* Return the entry of replication slot stats with the given name. Return
* NULL if not found and the caller didn't request to create it.
*
* create_it tells whether to create the new slot entry if it is not found.
* create tells whether to create the new slot entry if it is not found.
* ----------
*/
static int
pgstat_replslot_index(const char *name, bool create_it)
static PgStat_StatReplSlotEntry *
pgstat_get_replslot_entry(NameData name, bool create)
{
int i;
PgStat_StatReplSlotEntry *slotent;
bool found;
Assert(nReplSlotStats <= max_replication_slots);
for (i = 0; i < nReplSlotStats; i++)
if (replSlotStatHash == NULL)
{
if (namestrcmp(&replSlotStats[i].slotname, name) == 0)
return i; /* found */
HASHCTL hash_ctl;
/*
* Quick return NULL if the hash table is empty and the caller didn't
* request to create the entry.
*/
if (!create)
return NULL;
hash_ctl.keysize = sizeof(NameData);
hash_ctl.entrysize = sizeof(PgStat_StatReplSlotEntry);
replSlotStatHash = hash_create("Replication slots hash",
PGSTAT_REPLSLOT_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_BLOBS);
}
/*
* The slot is not found. We don't want to register the new statistics if
* the list is already full or the caller didn't request.
*/
if (i == max_replication_slots || !create_it)
return -1;
slotent = (PgStat_StatReplSlotEntry *) hash_search(replSlotStatHash,
(void *) &name,
create ? HASH_ENTER : HASH_FIND,
&found);
/* Register new slot */
memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
namestrcpy(&replSlotStats[nReplSlotStats].slotname, name);
if (!slotent)
{
/* not found */
Assert(!create && !found);
return NULL;
}
/* initialize the entry */
if (create && !found)
{
namestrcpy(&(slotent->slotname), NameStr(name));
pgstat_reset_replslot(slotent, 0);
}
return nReplSlotStats++;
return slotent;
}
/* ----------
* pgstat_reset_replslot
*
* Reset the replication slot stats at index 'i'.
* Reset the given replication slot stats.
* ----------
*/
static void
pgstat_reset_replslot(int i, TimestampTz ts)
pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts)
{
/* reset only counters. Don't clear slot name */
replSlotStats[i].spill_txns = 0;
replSlotStats[i].spill_count = 0;
replSlotStats[i].spill_bytes = 0;
replSlotStats[i].stream_txns = 0;
replSlotStats[i].stream_count = 0;
replSlotStats[i].stream_bytes = 0;
replSlotStats[i].total_txns = 0;
replSlotStats[i].total_bytes = 0;
replSlotStats[i].stat_reset_timestamp = ts;
slotent->spill_txns = 0;
slotent->spill_count = 0;
slotent->spill_bytes = 0;
slotent->stream_txns = 0;
slotent->stream_count = 0;
slotent->stream_bytes = 0;
slotent->total_txns = 0;
slotent->total_bytes = 0;
slotent->stat_reset_timestamp = ts;
}
/*
......
......@@ -1773,7 +1773,7 @@ void
UpdateDecodingStats(LogicalDecodingContext *ctx)
{
ReorderBuffer *rb = ctx->reorder;
PgStat_ReplSlotStats repSlotStat;
PgStat_StatReplSlotEntry repSlotStat;
/* Nothing to do if we don't have any replication stats to be sent. */
if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
......
......@@ -328,12 +328,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
* ReplicationSlotAllocationLock.
*/
if (SlotIsLogical(slot))
{
PgStat_ReplSlotStats repSlotStat;
MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats));
namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name));
pgstat_report_replslot(&repSlotStat);
}
pgstat_report_replslot_create(NameStr(slot->data.name));
/*
* Now that the slot has been marked as in_use and active, it's safe to
......@@ -349,17 +344,15 @@ ReplicationSlotCreate(const char *name, bool db_specific,
* Search for the named replication slot.
*
* Return the replication slot if found, otherwise NULL.
*
* The caller must hold ReplicationSlotControlLock in shared mode.
*/
ReplicationSlot *
SearchNamedReplicationSlot(const char *name)
SearchNamedReplicationSlot(const char *name, bool need_lock)
{
int i;
ReplicationSlot *slot = NULL;
ReplicationSlot *slot = NULL;
Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
LW_SHARED));
if (need_lock)
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
......@@ -372,6 +365,9 @@ SearchNamedReplicationSlot(const char *name)
}
}
if (need_lock)
LWLockRelease(ReplicationSlotControlLock);
return slot;
}
......@@ -416,7 +412,7 @@ retry:
* Search for the slot with the specified name if the slot to acquire is
* not given. If the slot is not found, we either return -1 or error out.
*/
s = slot ? slot : SearchNamedReplicationSlot(name);
s = slot ? slot : SearchNamedReplicationSlot(name, false);
if (s == NULL || !s->in_use)
{
LWLockRelease(ReplicationSlotControlLock);
......@@ -713,6 +709,12 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
* reduce that possibility. If the messages reached in reverse, we would
* lose one statistics update message. But the next update message will
* create the statistics for the replication slot.
*
* XXX In case, the messages for creation and drop slot of the same name
* get lost and create happens before (auto)vacuum cleans up the dead
* slot, the stats will be accumulated into the old slot. One can imagine
* having OIDs for each slot to avoid the accumulation of stats but that
* doesn't seem worth doing as in practice this won't happen frequently.
*/
if (SlotIsLogical(slot))
pgstat_report_replslot_drop(NameStr(slot->data.name));
......
......@@ -24,6 +24,7 @@
#include "pgstat.h"
#include "postmaster/bgworker_internals.h"
#include "postmaster/postmaster.h"
#include "replication/slot.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/acl.h"
......@@ -2207,8 +2208,33 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
char *target = NULL;
if (!PG_ARGISNULL(0))
{
ReplicationSlot *slot;
target = text_to_cstring(PG_GETARG_TEXT_PP(0));
/*
* Check if the slot exists with the given name. It is possible that
* by the time this message is executed the slot is dropped but at
* least this check will ensure that the given name is for a valid
* slot.
*/
slot = SearchNamedReplicationSlot(target, true);
if (!slot)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("replication slot \"%s\" does not exist",
target)));
/*
* Nothing to do for physical slots as we collect stats only for
* logical slots.
*/
if (SlotIsPhysical(slot))
PG_RETURN_VOID();
}
pgstat_reset_replslot_counter(target);
PG_RETURN_VOID();
......@@ -2280,73 +2306,77 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
/* Get the statistics for the replication slots */
/*
* Get the statistics for the replication slot. If the slot statistics is not
* available, return all-zeroes stats.
*/
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
{
#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
text *slotname_text = PG_GETARG_TEXT_P(0);
NameData slotname;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
PgStat_ReplSlotStats *slotstats;
int nstats;
int i;
Datum values[10];
bool nulls[10];
PgStat_StatReplSlotEntry *slotent;
PgStat_StatReplSlotEntry allzero;
/* check to see if caller supports us returning a tuplestore */
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot accept a set")));
if (!(rsinfo->allowedModes & SFRM_Materialize))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("materialize mode required, but it is not allowed in this context")));
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
oldcontext = MemoryContextSwitchTo(per_query_ctx);
tupstore = tuplestore_begin_heap(true, false, work_mem);
rsinfo->returnMode = SFRM_Materialize;
rsinfo->setResult = tupstore;
rsinfo->setDesc = tupdesc;
/* Initialise values and NULL flags arrays */
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
MemoryContextSwitchTo(oldcontext);
/* Initialise attributes information in the tuple descriptor */
tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_REPLICATION_SLOT_COLS);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "slot_name",
TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "spill_txns",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "spill_count",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "spill_bytes",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "stream_txns",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stream_count",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
slotstats = pgstat_fetch_replslot(&nstats);
for (i = 0; i < nstats; i++)
namestrcpy(&slotname, text_to_cstring(slotname_text));
slotent = pgstat_fetch_replslot(slotname);
if (!slotent)
{
Datum values[PG_STAT_GET_REPLICATION_SLOT_COLS];
bool nulls[PG_STAT_GET_REPLICATION_SLOT_COLS];
PgStat_ReplSlotStats *s = &(slotstats[i]);
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
values[0] = CStringGetTextDatum(NameStr(s->slotname));
values[1] = Int64GetDatum(s->spill_txns);
values[2] = Int64GetDatum(s->spill_count);
values[3] = Int64GetDatum(s->spill_bytes);
values[4] = Int64GetDatum(s->stream_txns);
values[5] = Int64GetDatum(s->stream_count);
values[6] = Int64GetDatum(s->stream_bytes);
values[7] = Int64GetDatum(s->total_txns);
values[8] = Int64GetDatum(s->total_bytes);
if (s->stat_reset_timestamp == 0)
nulls[9] = true;
else
values[9] = TimestampTzGetDatum(s->stat_reset_timestamp);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
/*
* If the slot is not found, initialise its stats. This is possible if
* the create slot message is lost.
*/
memset(&allzero, 0, sizeof(PgStat_StatReplSlotEntry));
slotent = &allzero;
}
tuplestore_donestoring(tupstore);
values[0] = CStringGetTextDatum(NameStr(slotname));
values[1] = Int64GetDatum(slotent->spill_txns);
values[2] = Int64GetDatum(slotent->spill_count);
values[3] = Int64GetDatum(slotent->spill_bytes);
values[4] = Int64GetDatum(slotent->stream_txns);
values[5] = Int64GetDatum(slotent->stream_count);
values[6] = Int64GetDatum(slotent->stream_bytes);
values[7] = Int64GetDatum(slotent->total_txns);
values[8] = Int64GetDatum(slotent->total_bytes);
return (Datum) 0;
if (slotent->stat_reset_timestamp == 0)
nulls[9] = true;
else
values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202104231
#define CATALOG_VERSION_NO 202104271
#endif
......@@ -5308,14 +5308,14 @@
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
prosrc => 'pg_stat_get_wal_receiver' },
{ oid => '8595', descr => 'statistics: information about replication slots',
proname => 'pg_stat_get_replication_slots', prorows => '10',
{ oid => '8595', descr => 'statistics: information about replication slot',
proname => 'pg_stat_get_replication_slot', prorows => '1',
proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slots' },
prorettype => 'record', proargtypes => 'text',
proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
prosrc => 'pg_stat_get_replication_slot' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
......
......@@ -541,6 +541,7 @@ typedef struct PgStat_MsgReplSlot
{
PgStat_MsgHdr m_hdr;
NameData m_slotname;
bool m_create;
bool m_drop;
PgStat_Counter m_spill_txns;
PgStat_Counter m_spill_count;
......@@ -917,7 +918,7 @@ typedef struct PgStat_SLRUStats
/*
* Replication slot statistics kept in the stats collector
*/
typedef struct PgStat_ReplSlotStats
typedef struct PgStat_StatReplSlotEntry
{
NameData slotname;
PgStat_Counter spill_txns;
......@@ -929,7 +930,7 @@ typedef struct PgStat_ReplSlotStats
PgStat_Counter total_txns;
PgStat_Counter total_bytes;
TimestampTz stat_reset_timestamp;
} PgStat_ReplSlotStats;
} PgStat_StatReplSlotEntry;
/*
......@@ -1031,7 +1032,8 @@ extern void pgstat_report_recovery_conflict(int reason);
extern void pgstat_report_deadlock(void);
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat);
extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
extern void pgstat_report_replslot_create(const char *slotname);
extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
......@@ -1129,7 +1131,7 @@ extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void);
extern PgStat_GlobalStats *pgstat_fetch_global(void);
extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
extern PgStat_SLRUStats *pgstat_fetch_slru(void);
extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname);
extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
extern void pgstat_count_slru_page_zeroed(int slru_idx);
......
......@@ -223,7 +223,7 @@ extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, int szslot);
extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
......
......@@ -2071,7 +2071,9 @@ pg_stat_replication_slots| SELECT s.slot_name,
s.total_txns,
s.total_bytes,
s.stats_reset
FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset);
FROM pg_replication_slots r,
LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
WHERE (r.datoid IS NOT NULL);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,
......
......@@ -1870,12 +1870,12 @@ PgStat_MsgTabstat
PgStat_MsgTempFile
PgStat_MsgVacuum
PgStat_MsgWal
PgStat_ReplSlotStats
PgStat_SLRUStats
PgStat_Shared_Reset_Target
PgStat_Single_Reset_Type
PgStat_StatDBEntry
PgStat_StatFuncEntry
PgStat_StatReplSlotEntry
PgStat_StatTabEntry
PgStat_SubXactStatus
PgStat_TableCounts
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment