Commit 187492b6 authored by Alvaro Herrera's avatar Alvaro Herrera

Split pgstat file in smaller pieces

We now write one file per database and one global file, instead of
having the whole thing in a single huge file.  This reduces the I/O that
must be done when partial data is required -- which is all the time,
because each process only needs information on its own database anyway.
Also, the autovacuum launcher does not need data about tables and
functions in each database; having the global stats for all DBs is
enough.

Catalog version bumped because we have a new subdir under PGDATA.

Author: Tomas Vondra.  Some rework by Álvaro
Testing by Jeff Janes
Other discussion by Heikki Linnakangas, Tom Lane.
parent 9475db3a
......@@ -38,6 +38,7 @@
#include "access/xact.h"
#include "catalog/pg_database.h"
#include "catalog/pg_proc.h"
#include "lib/ilist.h"
#include "libpq/ip.h"
#include "libpq/libpq.h"
#include "libpq/pqsignal.h"
......@@ -66,8 +67,9 @@
* Paths for the statistics files (relative to installation's $PGDATA).
* ----------
*/
#define PGSTAT_STAT_PERMANENT_FILENAME "global/pgstat.stat"
#define PGSTAT_STAT_PERMANENT_TMPFILE "global/pgstat.tmp"
#define PGSTAT_STAT_PERMANENT_DIRECTORY "pg_stat"
#define PGSTAT_STAT_PERMANENT_FILENAME "pg_stat/global.stat"
#define PGSTAT_STAT_PERMANENT_TMPFILE "pg_stat/global.tmp"
/* ----------
* Timer definitions.
......@@ -115,6 +117,7 @@ int pgstat_track_activity_query_size = 1024;
* Built from GUC parameter
* ----------
*/
char *pgstat_stat_directory = NULL;
char *pgstat_stat_filename = NULL;
char *pgstat_stat_tmpname = NULL;
......@@ -219,11 +222,16 @@ static int localNumBackends = 0;
*/
static PgStat_GlobalStats globalStats;
/* Last time the collector successfully wrote the stats file */
static TimestampTz last_statwrite;
/* Write request info for each database */
typedef struct DBWriteRequest
{
Oid databaseid; /* OID of the database to write */
TimestampTz request_time; /* timestamp of the last write request */
slist_node next;
} DBWriteRequest;
/* Latest statistics request time from backends */
static TimestampTz last_statrequest;
/* Latest statistics request times from backends */
static slist_head last_statrequests = SLIST_STATIC_INIT(last_statrequests);
static volatile bool need_exit = false;
static volatile bool got_SIGHUP = false;
......@@ -252,11 +260,16 @@ static void pgstat_sighup_handler(SIGNAL_ARGS);
static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create);
static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
Oid tableoid, bool create);
static void pgstat_write_statsfile(bool permanent);
static HTAB *pgstat_read_statsfile(Oid onlydb, bool permanent);
static void pgstat_write_statsfiles(bool permanent, bool allDbs);
static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent);
static void backend_read_statsfile(void);
static void pgstat_read_current_status(void);
static bool pgstat_write_statsfile_needed(void);
static bool pgstat_db_requested(Oid databaseid);
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
static void pgstat_send_funcstats(void);
static HTAB *pgstat_collect_oids(Oid catalogid);
......@@ -285,7 +298,6 @@ static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int le
static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
/* ------------------------------------------------------------
* Public functions called from postmaster follow
* ------------------------------------------------------------
......@@ -540,17 +552,42 @@ startup_failed:
SetConfigOption("track_counts", "off", PGC_INTERNAL, PGC_S_OVERRIDE);
}
/*
* subroutine for pgstat_reset_all
*/
static void
pgstat_reset_remove_files(const char *directory)
{
DIR *dir;
struct dirent *entry;
char fname[MAXPGPATH];
dir = AllocateDir(pgstat_stat_directory);
while ((entry = ReadDir(dir, pgstat_stat_directory)) != NULL)
{
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0)
continue;
/* XXX should we try to ignore files other than the ones we write? */
snprintf(fname, MAXPGPATH, "%s/%s", pgstat_stat_directory,
entry->d_name);
unlink(fname);
}
FreeDir(dir);
}
/*
* pgstat_reset_all() -
*
* Remove the stats file. This is currently used only if WAL
* Remove the stats files. This is currently used only if WAL
* recovery is needed after a crash.
*/
void
pgstat_reset_all(void)
{
unlink(pgstat_stat_filename);
unlink(PGSTAT_STAT_PERMANENT_FILENAME);
pgstat_reset_remove_files(pgstat_stat_directory);
pgstat_reset_remove_files(PGSTAT_STAT_PERMANENT_DIRECTORY);
}
#ifdef EXEC_BACKEND
......@@ -1408,13 +1445,14 @@ pgstat_ping(void)
* ----------
*/
static void
pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time)
pgstat_send_inquiry(TimestampTz clock_time, TimestampTz cutoff_time, Oid databaseid)
{
PgStat_MsgInquiry msg;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_INQUIRY);
msg.clock_time = clock_time;
msg.cutoff_time = cutoff_time;
msg.databaseid = databaseid;
pgstat_send(&msg, sizeof(msg));
}
......@@ -3052,18 +3090,12 @@ PgstatCollectorMain(int argc, char *argv[])
*/
init_ps_display("stats collector process", "", "", "");
/*
* Arrange to write the initial status file right away
*/
last_statrequest = GetCurrentTimestamp();
last_statwrite = last_statrequest - 1;
/*
* Read in an existing statistics stats file or initialize the stats to
* zero.
*/
pgStatRunningInCollector = true;
pgStatDBHash = pgstat_read_statsfile(InvalidOid, true);
pgStatDBHash = pgstat_read_statsfiles(InvalidOid, true, true);
/*
* Loop to process messages until we get SIGQUIT or detect ungraceful
......@@ -3109,8 +3141,8 @@ PgstatCollectorMain(int argc, char *argv[])
* Write the stats file if a new request has arrived that is not
* satisfied by existing file.
*/
if (last_statwrite < last_statrequest)
pgstat_write_statsfile(false);
if (pgstat_write_statsfile_needed())
pgstat_write_statsfiles(false, false);
/*
* Try to receive and process a message. This will not block,
......@@ -3269,7 +3301,7 @@ PgstatCollectorMain(int argc, char *argv[])
/*
* Save the final stats to reuse at next startup.
*/
pgstat_write_statsfile(true);
pgstat_write_statsfiles(true, true);
exit(0);
}
......@@ -3299,6 +3331,57 @@ pgstat_sighup_handler(SIGNAL_ARGS)
errno = save_errno;
}
/*
* Subroutine to clear stats in a database entry
*
* Tables and functions hashes are initialized to empty.
*/
static void
reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
{
HASHCTL hash_ctl;
dbentry->n_xact_commit = 0;
dbentry->n_xact_rollback = 0;
dbentry->n_blocks_fetched = 0;
dbentry->n_blocks_hit = 0;
dbentry->n_tuples_returned = 0;
dbentry->n_tuples_fetched = 0;
dbentry->n_tuples_inserted = 0;
dbentry->n_tuples_updated = 0;
dbentry->n_tuples_deleted = 0;
dbentry->last_autovac_time = 0;
dbentry->n_conflict_tablespace = 0;
dbentry->n_conflict_lock = 0;
dbentry->n_conflict_snapshot = 0;
dbentry->n_conflict_bufferpin = 0;
dbentry->n_conflict_startup_deadlock = 0;
dbentry->n_temp_files = 0;
dbentry->n_temp_bytes = 0;
dbentry->n_deadlocks = 0;
dbentry->n_block_read_time = 0;
dbentry->n_block_write_time = 0;
dbentry->stat_reset_timestamp = GetCurrentTimestamp();
dbentry->stats_timestamp = 0;
memset(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = sizeof(Oid);
hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
hash_ctl.hash = oid_hash;
dbentry->tables = hash_create("Per-database table",
PGSTAT_TAB_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_FUNCTION);
hash_ctl.keysize = sizeof(Oid);
hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
hash_ctl.hash = oid_hash;
dbentry->functions = hash_create("Per-database function",
PGSTAT_FUNCTION_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_FUNCTION);
}
/*
* Lookup the hash table entry for the specified database. If no hash
......@@ -3320,53 +3403,12 @@ pgstat_get_db_entry(Oid databaseid, bool create)
if (!create && !found)
return NULL;
/* If not found, initialize the new one. */
/*
* If not found, initialize the new one. This creates empty hash tables
* for tables and functions, too.
*/
if (!found)
{
HASHCTL hash_ctl;
result->tables = NULL;
result->functions = NULL;
result->n_xact_commit = 0;
result->n_xact_rollback = 0;
result->n_blocks_fetched = 0;
result->n_blocks_hit = 0;
result->n_tuples_returned = 0;
result->n_tuples_fetched = 0;
result->n_tuples_inserted = 0;
result->n_tuples_updated = 0;
result->n_tuples_deleted = 0;
result->last_autovac_time = 0;
result->n_conflict_tablespace = 0;
result->n_conflict_lock = 0;
result->n_conflict_snapshot = 0;
result->n_conflict_bufferpin = 0;
result->n_conflict_startup_deadlock = 0;
result->n_temp_files = 0;
result->n_temp_bytes = 0;
result->n_deadlocks = 0;
result->n_block_read_time = 0;
result->n_block_write_time = 0;
result->stat_reset_timestamp = GetCurrentTimestamp();
memset(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = sizeof(Oid);
hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
hash_ctl.hash = oid_hash;
result->tables = hash_create("Per-database table",
PGSTAT_TAB_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_FUNCTION);
hash_ctl.keysize = sizeof(Oid);
hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
hash_ctl.hash = oid_hash;
result->functions = hash_create("Per-database function",
PGSTAT_FUNCTION_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_FUNCTION);
}
reset_dbentry_counters(result);
return result;
}
......@@ -3422,30 +3464,32 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create)
/* ----------
* pgstat_write_statsfile() -
* pgstat_write_statsfiles() -
* Write the global statistics file, as well as requested DB files.
*
* Tell the news.
* If writing to the permanent file (happens when the collector is
* shutting down only), remove the temporary file so that backends
* If writing to the permanent files (happens when the collector is
* shutting down only), remove the temporary files so that backends
* starting up under a new postmaster can't read the old data before
* the new collector is ready.
*
* When 'allDbs' is false, only the requested databases (listed in
* last_statrequests) will be written; otherwise, all databases will be
* written.
* ----------
*/
static void
pgstat_write_statsfile(bool permanent)
pgstat_write_statsfiles(bool permanent, bool allDbs)
{
HASH_SEQ_STATUS hstat;
HASH_SEQ_STATUS tstat;
HASH_SEQ_STATUS fstat;
PgStat_StatDBEntry *dbentry;
PgStat_StatTabEntry *tabentry;
PgStat_StatFuncEntry *funcentry;
FILE *fpout;
int32 format_id;
const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
int rc;
elog(DEBUG2, "writing statsfile '%s'", statfile);
/*
* Open the statistics temp file to write out the current values.
*/
......@@ -3484,13 +3528,150 @@ pgstat_write_statsfile(bool permanent)
while ((dbentry = (PgStat_StatDBEntry *) hash_seq_search(&hstat)) != NULL)
{
/*
* Write out the DB entry including the number of live backends. We
* don't write the tables or functions pointers, since they're of no
* use to any other process.
* Write out the tables and functions into the DB stat file, if
* required.
*
* We need to do this before the dbentry write, to ensure the
* timestamps written to both are consistent.
*/
if (allDbs || pgstat_db_requested(dbentry->databaseid))
{
dbentry->stats_timestamp = globalStats.stats_timestamp;
pgstat_write_db_statsfile(dbentry, permanent);
}
/*
* Write out the DB entry. We don't write the tables or functions
* pointers, since they're of no use to any other process.
*/
fputc('D', fpout);
rc = fwrite(dbentry, offsetof(PgStat_StatDBEntry, tables), 1, fpout);
(void) rc; /* we'll check for error with ferror */
}
/*
* No more output to be done. Close the temp file and replace the old
* pgstat.stat with it. The ferror() check replaces testing for error
* after each individual fputc or fwrite above.
*/
fputc('E', fpout);
if (ferror(fpout))
{
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not write temporary statistics file \"%s\": %m",
tmpfile)));
FreeFile(fpout);
unlink(tmpfile);
}
else if (FreeFile(fpout) < 0)
{
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not close temporary statistics file \"%s\": %m",
tmpfile)));
unlink(tmpfile);
}
else if (rename(tmpfile, statfile) < 0)
{
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not rename temporary statistics file \"%s\" to \"%s\": %m",
tmpfile, statfile)));
unlink(tmpfile);
}
if (permanent)
unlink(pgstat_stat_filename);
/*
* Now throw away the list of requests. Note that requests sent after we
* started the write are still waiting on the network socket.
*/
if (!slist_is_empty(&last_statrequests))
{
slist_mutable_iter iter;
slist_foreach_modify(iter, &last_statrequests)
{
DBWriteRequest *req;
req = slist_container(DBWriteRequest, next, iter.cur);
pfree(req);
}
slist_init(&last_statrequests);
}
}
/*
* return the filename for a DB stat file; filename is the output buffer,
* of length len.
*/
static void
get_dbstat_filename(bool permanent, bool tempname, Oid databaseid,
char *filename, int len)
{
int printed;
printed = snprintf(filename, len, "%s/db_%u.%s",
permanent ? PGSTAT_STAT_PERMANENT_DIRECTORY :
pgstat_stat_directory,
databaseid,
tempname ? "tmp" : "stat");
if (printed > len)
elog(ERROR, "overlength pgstat path");
}
/* ----------
* pgstat_write_db_statsfile() -
* Write the stat file for a single database.
*
* If writing to the permanent file (happens when the collector is
* shutting down only), remove the temporary file so that backends
* starting up under a new postmaster can't read the old data before
* the new collector is ready.
* ----------
*/
static void
pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent)
{
HASH_SEQ_STATUS tstat;
HASH_SEQ_STATUS fstat;
PgStat_StatTabEntry *tabentry;
PgStat_StatFuncEntry *funcentry;
FILE *fpout;
int32 format_id;
Oid dbid = dbentry->databaseid;
int rc;
char tmpfile[MAXPGPATH];
char statfile[MAXPGPATH];
get_dbstat_filename(permanent, true, dbid, tmpfile, MAXPGPATH);
get_dbstat_filename(permanent, false, dbid, statfile, MAXPGPATH);
elog(DEBUG2, "writing statsfile '%s'", statfile);
/*
* Open the statistics temp file to write out the current values.
*/
fpout = AllocateFile(tmpfile, PG_BINARY_W);
if (fpout == NULL)
{
ereport(LOG,
(errcode_for_file_access(),
errmsg("could not open temporary statistics file \"%s\": %m",
tmpfile)));
return;
}
/*
* Write the file header --- currently just a format ID.
*/
format_id = PGSTAT_FILE_FORMAT_ID;
rc = fwrite(&format_id, sizeof(format_id), 1, fpout);
(void) rc; /* we'll check for error with ferror */
/*
* Walk through the database's access stats per table.
......@@ -3514,12 +3695,6 @@ pgstat_write_statsfile(bool permanent)
(void) rc; /* we'll check for error with ferror */
}
/*
* Mark the end of this DB
*/
fputc('d', fpout);
}
/*
* No more output to be done. Close the temp file and replace the old
* pgstat.stat with it. The ferror() check replaces testing for error
......@@ -3552,61 +3727,36 @@ pgstat_write_statsfile(bool permanent)
tmpfile, statfile)));
unlink(tmpfile);
}
else
{
/*
* Successful write, so update last_statwrite.
*/
last_statwrite = globalStats.stats_timestamp;
/*
* If there is clock skew between backends and the collector, we could
* receive a stats request time that's in the future. If so, complain
* and reset last_statrequest. Resetting ensures that no inquiry
* message can cause more than one stats file write to occur.
*/
if (last_statrequest > last_statwrite)
if (permanent)
{
char *reqtime;
char *mytime;
/* Copy because timestamptz_to_str returns a static buffer */
reqtime = pstrdup(timestamptz_to_str(last_statrequest));
mytime = pstrdup(timestamptz_to_str(last_statwrite));
elog(LOG, "last_statrequest %s is later than collector's time %s",
reqtime, mytime);
pfree(reqtime);
pfree(mytime);
get_dbstat_filename(false, false, dbid, statfile, MAXPGPATH);
last_statrequest = last_statwrite;
}
elog(DEBUG2, "removing temporary stat file '%s'", statfile);
unlink(statfile);
}
if (permanent)
unlink(pgstat_stat_filename);
}
/* ----------
* pgstat_read_statsfile() -
* pgstat_read_statsfiles() -
*
* Reads in an existing statistics collector file and initializes the
* databases' hash table (whose entries point to the tables' hash tables).
* Reads in the existing statistics collector files and initializes the
* databases' hash table. If the permanent file name is requested (which
* only happens in the stats collector itself), also remove the file after
* reading; the in-memory status is now authoritative, and the permanent file
* would be out of date in case somebody else reads it.
*
* If a deep read is requested, table/function stats are read also, otherwise
* the table/function hash tables remain empty.
* ----------
*/
static HTAB *
pgstat_read_statsfile(Oid onlydb, bool permanent)
pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
{
PgStat_StatDBEntry *dbentry;
PgStat_StatDBEntry dbbuf;
PgStat_StatTabEntry *tabentry;
PgStat_StatTabEntry tabbuf;
PgStat_StatFuncEntry funcbuf;
PgStat_StatFuncEntry *funcentry;
HASHCTL hash_ctl;
HTAB *dbhash;
HTAB *tabhash = NULL;
HTAB *funchash = NULL;
FILE *fpin;
int32 format_id;
bool found;
......@@ -3641,7 +3791,7 @@ pgstat_read_statsfile(Oid onlydb, bool permanent)
globalStats.stat_reset_timestamp = GetCurrentTimestamp();
/*
* Try to open the status file. If it doesn't exist, the backends simply
* Try to open the stats file. If it doesn't exist, the backends simply
* return zero for anything and the collector simply starts from scratch
* with empty counters.
*
......@@ -3662,8 +3812,8 @@ pgstat_read_statsfile(Oid onlydb, bool permanent)
/*
* Verify it's of the expected format.
*/
if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id)
|| format_id != PGSTAT_FILE_FORMAT_ID)
if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
format_id != PGSTAT_FILE_FORMAT_ID)
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"", statfile)));
......@@ -3690,8 +3840,7 @@ pgstat_read_statsfile(Oid onlydb, bool permanent)
{
/*
* 'D' A PgStat_StatDBEntry struct describing a database
* follows. Subsequently, zero to many 'T' and 'F' entries
* will follow until a 'd' is encountered.
* follows.
*/
case 'D':
if (fread(&dbbuf, 1, offsetof(PgStat_StatDBEntry, tables),
......@@ -3753,21 +3902,107 @@ pgstat_read_statsfile(Oid onlydb, bool permanent)
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
/*
* Arrange that following records add entries to this
* database's hash tables.
* If requested, read the data from the database-specific
* file. If there was onlydb specified (!= InvalidOid), we
* would not get here because of a break above. So we don't
* need to recheck.
*/
tabhash = dbentry->tables;
funchash = dbentry->functions;
if (deep)
pgstat_read_db_statsfile(dbentry->databaseid,
dbentry->tables,
dbentry->functions,
permanent);
break;
case 'E':
goto done;
default:
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"",
statfile)));
goto done;
}
}
done:
FreeFile(fpin);
/* If requested to read the permanent file, also get rid of it. */
if (permanent)
{
elog(DEBUG2, "removing permanent stats file '%s'", statfile);
unlink(statfile);
}
return dbhash;
}
/* ----------
* pgstat_read_db_statsfile() -
*
* Reads in the existing statistics collector file for the given database,
* and initializes the tables and functions hash tables.
*
* As pgstat_read_statsfiles, if the permanent file is requested, it is
* removed after reading.
* ----------
*/
static void
pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash,
bool permanent)
{
PgStat_StatTabEntry *tabentry;
PgStat_StatTabEntry tabbuf;
PgStat_StatFuncEntry funcbuf;
PgStat_StatFuncEntry *funcentry;
FILE *fpin;
int32 format_id;
bool found;
char statfile[MAXPGPATH];
get_dbstat_filename(permanent, false, databaseid, statfile, MAXPGPATH);
/*
* Try to open the stats file. If it doesn't exist, the backends simply
* return zero for anything and the collector simply starts from scratch
* with empty counters.
*
* ENOENT is a possibility if the stats collector is not running or has
* not yet written the stats file the first time. Any other failure
* condition is suspicious.
*/
if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
{
if (errno != ENOENT)
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errcode_for_file_access(),
errmsg("could not open statistics file \"%s\": %m",
statfile)));
return;
}
/*
* 'd' End of this database.
* Verify it's of the expected format.
*/
case 'd':
tabhash = NULL;
funchash = NULL;
break;
if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
format_id != PGSTAT_FILE_FORMAT_ID)
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"", statfile)));
goto done;
}
/*
* We found an existing collector stats file. Read it and put all the
* hashtable entries into place.
*/
for (;;)
{
switch (fgetc(fpin))
{
/*
* 'T' A PgStat_StatTabEntry follows.
*/
......@@ -3854,29 +4089,44 @@ done:
FreeFile(fpin);
if (permanent)
unlink(PGSTAT_STAT_PERMANENT_FILENAME);
{
elog(DEBUG2, "removing permanent stats file '%s'", statfile);
unlink(statfile);
}
return dbhash;
return;
}
/* ----------
* pgstat_read_statsfile_timestamp() -
* pgstat_read_db_statsfile_timestamp() -
*
* Attempt to fetch the timestamp of an existing stats file.
* Returns TRUE if successful (timestamp is stored at *ts).
* Attempt to determine the timestamp of the last db statfile write.
* Returns TRUE if successful; the timestamp is stored in *ts.
*
* This needs to be careful about handling databases for which no stats file
* exists, such as databases without a stat entry or those not yet written:
*
* - if there's a database entry in the global file, return the corresponding
* stats_timestamp value.
*
* - if there's no db stat entry (e.g. for a new or inactive database),
* there's no stat_timestamp value, but also nothing to write so we return
* the timestamp of the global statfile.
* ----------
*/
static bool
pgstat_read_statsfile_timestamp(bool permanent, TimestampTz *ts)
pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
TimestampTz *ts)
{
PgStat_StatDBEntry dbentry;
PgStat_GlobalStats myGlobalStats;
FILE *fpin;
int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
/*
* Try to open the status file. As above, anything but ENOENT is worthy
* of complaining about.
* Try to open the stats file. As above, anything but ENOENT is worthy of
* complaining about.
*/
if ((fpin = AllocateFile(statfile, PG_BINARY_R)) == NULL)
{
......@@ -3891,8 +4141,8 @@ pgstat_read_statsfile_timestamp(bool permanent, TimestampTz *ts)
/*
* Verify it's of the expected format.
*/
if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id)
|| format_id != PGSTAT_FILE_FORMAT_ID)
if (fread(&format_id, 1, sizeof(format_id), fpin) != sizeof(format_id) ||
format_id != PGSTAT_FILE_FORMAT_ID)
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"", statfile)));
......@@ -3903,7 +4153,8 @@ pgstat_read_statsfile_timestamp(bool permanent, TimestampTz *ts)
/*
* Read global stats struct
*/
if (fread(&myGlobalStats, 1, sizeof(myGlobalStats), fpin) != sizeof(myGlobalStats))
if (fread(&myGlobalStats, 1, sizeof(myGlobalStats),
fpin) != sizeof(myGlobalStats))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"", statfile)));
......@@ -3911,8 +4162,55 @@ pgstat_read_statsfile_timestamp(bool permanent, TimestampTz *ts)
return false;
}
/* By default, we're going to return the timestamp of the global file. */
*ts = myGlobalStats.stats_timestamp;
/*
* We found an existing collector stats file. Read it and look for a
* record for the requested database. If found, use its timestamp.
*/
for (;;)
{
switch (fgetc(fpin))
{
/*
* 'D' A PgStat_StatDBEntry struct describing a database
* follows.
*/
case 'D':
if (fread(&dbentry, 1, offsetof(PgStat_StatDBEntry, tables),
fpin) != offsetof(PgStat_StatDBEntry, tables))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"",
statfile)));
goto done;
}
/*
* If this is the DB we're looking for, save its timestamp and
* we're done.
*/
if (dbentry.databaseid == databaseid)
{
*ts = dbentry.stats_timestamp;
goto done;
}
break;
case 'E':
goto done;
default:
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"",
statfile)));
goto done;
}
}
done:
FreeFile(fpin);
return true;
}
......@@ -3947,7 +4245,7 @@ backend_read_statsfile(void)
CHECK_FOR_INTERRUPTS();
ok = pgstat_read_statsfile_timestamp(false, &file_ts);
ok = pgstat_read_db_statsfile_timestamp(MyDatabaseId, false, &file_ts);
cur_ts = GetCurrentTimestamp();
/* Calculate min acceptable timestamp, if we didn't already */
......@@ -3956,9 +4254,9 @@ backend_read_statsfile(void)
/*
* We set the minimum acceptable timestamp to PGSTAT_STAT_INTERVAL
* msec before now. This indirectly ensures that the collector
* needn't write the file more often than PGSTAT_STAT_INTERVAL.
* In an autovacuum worker, however, we want a lower delay to
* avoid using stale data, so we use PGSTAT_RETRY_DELAY (since the
* needn't write the file more often than PGSTAT_STAT_INTERVAL. In
* an autovacuum worker, however, we want a lower delay to avoid
* using stale data, so we use PGSTAT_RETRY_DELAY (since the
* number of workers is low, this shouldn't be a problem).
*
* We don't recompute min_ts after sleeping, except in the
......@@ -4006,7 +4304,7 @@ backend_read_statsfile(void)
pfree(mytime);
}
pgstat_send_inquiry(cur_ts, min_ts);
pgstat_send_inquiry(cur_ts, min_ts, MyDatabaseId);
break;
}
......@@ -4016,7 +4314,7 @@ backend_read_statsfile(void)
/* Not there or too old, so kick the collector and wait a bit */
if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
pgstat_send_inquiry(cur_ts, min_ts);
pgstat_send_inquiry(cur_ts, min_ts, MyDatabaseId);
pg_usleep(PGSTAT_RETRY_DELAY * 1000L);
}
......@@ -4024,11 +4322,14 @@ backend_read_statsfile(void)
if (count >= PGSTAT_POLL_LOOP_COUNT)
elog(WARNING, "pgstat wait timeout");
/* Autovacuum launcher wants stats about all databases */
/*
* Autovacuum launcher wants stats about all databases, but a shallow read
* is sufficient.
*/
if (IsAutoVacuumLauncherProcess())
pgStatDBHash = pgstat_read_statsfile(InvalidOid, false);
pgStatDBHash = pgstat_read_statsfiles(InvalidOid, false, false);
else
pgStatDBHash = pgstat_read_statsfile(MyDatabaseId, false);
pgStatDBHash = pgstat_read_statsfiles(MyDatabaseId, false, true);
}
......@@ -4084,26 +4385,53 @@ pgstat_clear_snapshot(void)
static void
pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len)
{
slist_iter iter;
bool found = false;
DBWriteRequest *newreq;
PgStat_StatDBEntry *dbentry;
elog(DEBUG2, "received inquiry for %d", msg->databaseid);
/*
* Advance last_statrequest if this requestor has a newer cutoff time
* than any previous request.
* Find the last write request for this DB (found=true in that case).
* Plain linear search, not really worth doing any magic here (probably).
*/
if (msg->cutoff_time > last_statrequest)
last_statrequest = msg->cutoff_time;
slist_foreach(iter, &last_statrequests)
{
DBWriteRequest *req = slist_container(DBWriteRequest, next, iter.cur);
if (req->databaseid != msg->databaseid)
continue;
if (msg->cutoff_time > req->request_time)
req->request_time = msg->cutoff_time;
found = true;
return;
}
/*
* If the requestor's local clock time is older than last_statwrite, we
* There's no request for this DB yet, so create one.
*/
newreq = palloc(sizeof(DBWriteRequest));
newreq->databaseid = msg->databaseid;
newreq->request_time = msg->clock_time;
slist_push_head(&last_statrequests, &newreq->next);
/*
* If the requestor's local clock time is older than stats_timestamp, we
* should suspect a clock glitch, ie system time going backwards; though
* the more likely explanation is just delayed message receipt. It is
* worth expending a GetCurrentTimestamp call to be sure, since a large
* retreat in the system clock reading could otherwise cause us to neglect
* to update the stats file for a long time.
*/
if (msg->clock_time < last_statwrite)
dbentry = pgstat_get_db_entry(msg->databaseid, false);
if ((dbentry != NULL) && (msg->clock_time < dbentry->stats_timestamp))
{
TimestampTz cur_ts = GetCurrentTimestamp();
if (cur_ts < last_statwrite)
if (cur_ts < dbentry->stats_timestamp)
{
/*
* Sure enough, time went backwards. Force a new stats file write
......@@ -4113,15 +4441,16 @@ pgstat_recv_inquiry(PgStat_MsgInquiry *msg, int len)
char *mytime;
/* Copy because timestamptz_to_str returns a static buffer */
writetime = pstrdup(timestamptz_to_str(last_statwrite));
writetime = pstrdup(timestamptz_to_str(dbentry->stats_timestamp));
mytime = pstrdup(timestamptz_to_str(cur_ts));
elog(LOG, "last_statwrite %s is later than collector's time %s",
writetime, mytime);
elog(LOG,
"stats_timestamp %s is later than collector's time %s for db %d",
writetime, mytime, dbentry->databaseid);
pfree(writetime);
pfree(mytime);
last_statrequest = cur_ts;
last_statwrite = last_statrequest - 1;
newreq->request_time = cur_ts;
dbentry->stats_timestamp = cur_ts - 1;
}
}
}
......@@ -4270,29 +4599,36 @@ pgstat_recv_tabpurge(PgStat_MsgTabpurge *msg, int len)
static void
pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
{
Oid dbid = msg->m_databaseid;
PgStat_StatDBEntry *dbentry;
/*
* Lookup the database in the hashtable.
*/
dbentry = pgstat_get_db_entry(msg->m_databaseid, false);
dbentry = pgstat_get_db_entry(dbid, false);
/*
* If found, remove it.
* If found, remove it (along with the db statfile).
*/
if (dbentry)
{
char statfile[MAXPGPATH];
get_dbstat_filename(true, false, dbid, statfile, MAXPGPATH);
elog(DEBUG2, "removing %s", statfile);
unlink(statfile);
if (dbentry->tables != NULL)
hash_destroy(dbentry->tables);
if (dbentry->functions != NULL)
hash_destroy(dbentry->functions);
if (hash_search(pgStatDBHash,
(void *) &(dbentry->databaseid),
(void *) &dbid,
HASH_REMOVE, NULL) == NULL)
ereport(ERROR,
(errmsg("database hash table corrupted "
"during cleanup --- abort")));
(errmsg("database hash table corrupted during cleanup --- abort")));
}
}
......@@ -4306,7 +4642,6 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len)
static void
pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
{
HASHCTL hash_ctl;
PgStat_StatDBEntry *dbentry;
/*
......@@ -4330,43 +4665,10 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len)
dbentry->functions = NULL;
/*
* Reset database-level stats too. This should match the initialization
* code in pgstat_get_db_entry().
* Reset database-level stats, too. This creates empty hash tables for
* tables and functions.
*/
dbentry->n_xact_commit = 0;
dbentry->n_xact_rollback = 0;
dbentry->n_blocks_fetched = 0;
dbentry->n_blocks_hit = 0;
dbentry->n_tuples_returned = 0;
dbentry->n_tuples_fetched = 0;
dbentry->n_tuples_inserted = 0;
dbentry->n_tuples_updated = 0;
dbentry->n_tuples_deleted = 0;
dbentry->last_autovac_time = 0;
dbentry->n_temp_bytes = 0;
dbentry->n_temp_files = 0;
dbentry->n_deadlocks = 0;
dbentry->n_block_read_time = 0;
dbentry->n_block_write_time = 0;
dbentry->stat_reset_timestamp = GetCurrentTimestamp();
memset(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = sizeof(Oid);
hash_ctl.entrysize = sizeof(PgStat_StatTabEntry);
hash_ctl.hash = oid_hash;
dbentry->tables = hash_create("Per-database table",
PGSTAT_TAB_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_FUNCTION);
hash_ctl.keysize = sizeof(Oid);
hash_ctl.entrysize = sizeof(PgStat_StatFuncEntry);
hash_ctl.hash = oid_hash;
dbentry->functions = hash_create("Per-database function",
PGSTAT_FUNCTION_HASH_SIZE,
&hash_ctl,
HASH_ELEM | HASH_FUNCTION);
reset_dbentry_counters(dbentry);
}
/* ----------
......@@ -4687,3 +4989,42 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len)
HASH_REMOVE, NULL);
}
}
/* ----------
* pgstat_write_statsfile_needed() -
*
* Do we need to write out the files?
* ----------
*/
static bool
pgstat_write_statsfile_needed(void)
{
if (!slist_is_empty(&last_statrequests))
return true;
/* Everything was written recently */
return false;
}
/* ----------
* pgstat_db_requested() -
*
* Checks whether stats for a particular DB need to be written to a file.
* ----------
*/
static bool
pgstat_db_requested(Oid databaseid)
{
slist_iter iter;
/* Check the databases if they need to refresh the stats. */
slist_foreach(iter, &last_statrequests)
{
DBWriteRequest *req = slist_container(DBWriteRequest, next, iter.cur);
if (req->databaseid == databaseid)
return true;
}
return false;
}
......@@ -8705,14 +8705,23 @@ static void
assign_pgstat_temp_directory(const char *newval, void *extra)
{
/* check_canonical_path already canonicalized newval for us */
char *dname;
char *tname;
char *fname;
tname = guc_malloc(ERROR, strlen(newval) + 12); /* /pgstat.tmp */
sprintf(tname, "%s/pgstat.tmp", newval);
fname = guc_malloc(ERROR, strlen(newval) + 13); /* /pgstat.stat */
sprintf(fname, "%s/pgstat.stat", newval);
/* directory */
dname = guc_malloc(ERROR, strlen(newval) + 1); /* runtime dir */
sprintf(dname, "%s", newval);
/* global stats */
tname = guc_malloc(ERROR, strlen(newval) + 12); /* /global.tmp */
sprintf(tname, "%s/global.tmp", newval);
fname = guc_malloc(ERROR, strlen(newval) + 13); /* /global.stat */
sprintf(fname, "%s/global.stat", newval);
if (pgstat_stat_directory)
free(pgstat_stat_directory);
pgstat_stat_directory = dname;
if (pgstat_stat_tmpname)
free(pgstat_stat_tmpname);
pgstat_stat_tmpname = tname;
......
......@@ -192,6 +192,7 @@ const char *subdirs[] = {
"base",
"base/1",
"pg_tblspc",
"pg_stat",
"pg_stat_tmp"
};
......
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201302131
#define CATALOG_VERSION_NO 201302181
#endif
......@@ -205,6 +205,7 @@ typedef struct PgStat_MsgInquiry
PgStat_MsgHdr m_hdr;
TimestampTz clock_time; /* observed local clock time */
TimestampTz cutoff_time; /* minimum acceptable file timestamp */
Oid databaseid; /* requested DB (InvalidOid => all DBs) */
} PgStat_MsgInquiry;
......@@ -514,7 +515,7 @@ typedef union PgStat_Msg
* ------------------------------------------------------------
*/
#define PGSTAT_FILE_FORMAT_ID 0x01A5BC9A
#define PGSTAT_FILE_FORMAT_ID 0x01A5BC9B
/* ----------
* PgStat_StatDBEntry The collector's data per database
......@@ -545,6 +546,7 @@ typedef struct PgStat_StatDBEntry
PgStat_Counter n_block_write_time;
TimestampTz stat_reset_timestamp;
TimestampTz stats_timestamp; /* time of db stats file update */
/*
* tables and functions must be last in the struct, because we don't write
......@@ -722,6 +724,7 @@ extern bool pgstat_track_activities;
extern bool pgstat_track_counts;
extern int pgstat_track_functions;
extern PGDLLIMPORT int pgstat_track_activity_query_size;
extern char *pgstat_stat_directory;
extern char *pgstat_stat_tmpname;
extern char *pgstat_stat_filename;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment