Commit 97903c3d authored by Tom Lane's avatar Tom Lane

Fix a performance problem in databases with large numbers of tables

(or other types of pg_class entry): the function pgstat_vacuum_tabstat,
invoked during VACUUM startup, had runtime proportional to the number of
stats table entries times the number of pg_class rows; in other words
O(N^2) if the stats collector's information is reasonably complete.
Replace list searching with a hash table to bring it back to O(N)
behavior.  Per report from kim at myemma.com.

Back-patch as far as 8.1; 8.0 and before use different coding here.
parent 87f6d641
......@@ -13,7 +13,7 @@
*
* Copyright (c) 2001-2007, PostgreSQL Global Development Group
*
* $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.142 2007/01/05 22:19:36 momjian Exp $
* $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.143 2007/01/11 23:06:03 tgl Exp $
* ----------
*/
#include "postgres.h"
......@@ -159,6 +159,7 @@ static void pgstat_write_statsfile(void);
static void pgstat_read_statsfile(HTAB **dbhash, Oid onlydb);
static void backend_read_statsfile(void);
static void pgstat_read_current_status(void);
static HTAB *pgstat_collect_oids(Oid catalogid);
static void pgstat_setheader(PgStat_MsgHdr *hdr, StatMsgType mtype);
static void pgstat_send(void *msg, int len);
......@@ -657,10 +658,7 @@ pgstat_report_tabstat(void)
void
pgstat_vacuum_tabstat(void)
{
List *oidlist;
Relation rel;
HeapScanDesc scan;
HeapTuple tup;
HTAB *htab;
PgStat_MsgTabpurge msg;
HASH_SEQ_STATUS hstat;
PgStat_StatDBEntry *dbentry;
......@@ -679,15 +677,7 @@ pgstat_vacuum_tabstat(void)
/*
* Read pg_database and make a list of OIDs of all existing databases
*/
oidlist = NIL;
rel = heap_open(DatabaseRelationId, AccessShareLock);
scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
oidlist = lappend_oid(oidlist, HeapTupleGetOid(tup));
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
htab = pgstat_collect_oids(DatabaseRelationId);
/*
* Search the database hash table for dead databases and tell the
......@@ -698,12 +688,14 @@ pgstat_vacuum_tabstat(void)
{
Oid dbid = dbentry->databaseid;
if (!list_member_oid(oidlist, dbid))
CHECK_FOR_INTERRUPTS();
if (hash_search(htab, (void *) &dbid, HASH_FIND, NULL) == NULL)
pgstat_drop_database(dbid);
}
/* Clean up */
list_free(oidlist);
hash_destroy(htab);
/*
* Lookup our own database entry; if not found, nothing more to do.
......@@ -717,15 +709,7 @@ pgstat_vacuum_tabstat(void)
/*
* Similarly to above, make a list of all known relations in this DB.
*/
oidlist = NIL;
rel = heap_open(RelationRelationId, AccessShareLock);
scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
oidlist = lappend_oid(oidlist, HeapTupleGetOid(tup));
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
htab = pgstat_collect_oids(RelationRelationId);
/*
* Initialize our messages table counter to zero
......@@ -738,13 +722,17 @@ pgstat_vacuum_tabstat(void)
hash_seq_init(&hstat, dbentry->tables);
while ((tabentry = (PgStat_StatTabEntry *) hash_seq_search(&hstat)) != NULL)
{
if (list_member_oid(oidlist, tabentry->tableid))
Oid tabid = tabentry->tableid;
CHECK_FOR_INTERRUPTS();
if (hash_search(htab, (void *) &tabid, HASH_FIND, NULL) != NULL)
continue;
/*
* Not there, so add this table's Oid to the message
*/
msg.m_tableid[msg.m_nentries++] = tabentry->tableid;
msg.m_tableid[msg.m_nentries++] = tabid;
/*
* If the message is full, send it out and reinitialize to empty
......@@ -776,7 +764,50 @@ pgstat_vacuum_tabstat(void)
}
/* Clean up */
list_free(oidlist);
hash_destroy(htab);
}
/* ----------
* pgstat_collect_oids() -
*
* Collect the OIDs of either all databases or all tables, according to
* the parameter, into a temporary hash table. Caller should hash_destroy
* the result when done with it.
* ----------
*/
static HTAB *
pgstat_collect_oids(Oid catalogid)
{
HTAB *htab;
HASHCTL hash_ctl;
Relation rel;
HeapScanDesc scan;
HeapTuple tup;
memset(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = sizeof(Oid);
hash_ctl.entrysize = sizeof(Oid);
hash_ctl.hash = oid_hash;
htab = hash_create("Temporary table of OIDs",
PGSTAT_TAB_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_FUNCTION);
rel = heap_open(catalogid, AccessShareLock);
scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
while ((tup = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
Oid thisoid = HeapTupleGetOid(tup);
CHECK_FOR_INTERRUPTS();
(void) hash_search(htab, (void *) &thisoid, HASH_ENTER, NULL);
}
heap_endscan(scan);
heap_close(rel, AccessShareLock);
return htab;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment