Commit e2a186b0 authored by Alvaro Herrera's avatar Alvaro Herrera

Add a multi-worker capability to autovacuum. This allows multiple worker

processes to be running simultaneously.  Also, now autovacuum processes do not
count towards the max_connections limit; they are counted separately from
regular processes, and are limited by the new GUC variable
autovacuum_max_workers.

The launcher now has intelligence to launch workers on each database every
autovacuum_naptime seconds, limited only on the max amount of worker slots
available.

Also, the global worker I/O utilization is limited by the vacuum cost-based
delay feature.  Workers are "balanced" so that the total I/O consumption does
not exceed the established limit.  This part of the patch was contributed by
ITAGAKI Takahiro.

Per discussion.
parent 42dc4b66
<!-- $PostgreSQL: pgsql/doc/src/sgml/config.sgml,v 1.119 2007/04/02 15:27:02 petere Exp $ -->
<!-- $PostgreSQL: pgsql/doc/src/sgml/config.sgml,v 1.120 2007/04/16 18:29:50 alvherre Exp $ -->
<chapter Id="runtime-config">
<title>Server Configuration</title>
......@@ -3166,7 +3166,7 @@ SELECT * FROM parent WHERE key = 2400;
<listitem>
<para>
Controls whether the server should run the
autovacuum daemon. This is off by default.
autovacuum launcher daemon. This is on by default.
<varname>stats_start_collector</> and <varname>stats_row_level</>
must also be turned on for autovacuum to work.
This parameter can only be set in the <filename>postgresql.conf</>
......@@ -3175,6 +3175,21 @@ SELECT * FROM parent WHERE key = 2400;
</listitem>
</varlistentry>
<varlistentry id="guc-autovacuum-max-workers" xreflabel="autovacuum_max_workers">
<term><varname>autovacuum_max_workers</varname> (<type>integer</type>)</term>
<indexterm>
<primary><varname>autovacuum_max_workers</> configuration parameter</primary>
</indexterm>
<listitem>
<para>
Specifies the maximum number of autovacuum processes (other than the
autovacuum launcher) which may be running at any one time. The default
is three (<literal>3</literal>). This parameter can only be set in
the <filename>postgresql.conf</> file or on the server command line.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-autovacuum-naptime" xreflabel="autovacuum_naptime">
<term><varname>autovacuum_naptime</varname> (<type>integer</type>)</term>
<indexterm>
......@@ -3182,9 +3197,9 @@ SELECT * FROM parent WHERE key = 2400;
</indexterm>
<listitem>
<para>
Specifies the delay between activity rounds for the autovacuum
daemon. In each round the daemon examines one database
and issues <command>VACUUM</> and <command>ANALYZE</> commands
Specifies the minimum delay between autovacuum runs on any given
database. In each round the daemon examines the
database and issues <command>VACUUM</> and <command>ANALYZE</> commands
as needed for tables in that database. The delay is measured
in seconds, and the default is one minute (<literal>1m</>).
This parameter can only be set in the <filename>postgresql.conf</>
......@@ -3318,7 +3333,10 @@ SELECT * FROM parent WHERE key = 2400;
Specifies the cost limit value that will be used in automatic
<command>VACUUM</> operations. If <literal>-1</> is specified (which is the
default), the regular
<xref linkend="guc-vacuum-cost-limit"> value will be used.
<xref linkend="guc-vacuum-cost-limit"> value will be used. Note that
the value is distributed proportionally among the running autovacuum
workers, if there is more than one, so that the sum of the limits of
each worker never exceeds the limit on this variable.
This parameter can only be set in the <filename>postgresql.conf</>
file or on the server command line.
This setting can be overridden for individual tables by entries in
......
<!-- $PostgreSQL: pgsql/doc/src/sgml/maintenance.sgml,v 1.70 2007/02/01 19:10:24 momjian Exp $ -->
<!-- $PostgreSQL: pgsql/doc/src/sgml/maintenance.sgml,v 1.71 2007/04/16 18:29:50 alvherre Exp $ -->
<chapter id="maintenance">
<title>Routine Database Maintenance Tasks</title>
......@@ -466,26 +466,43 @@ HINT: Stop the postmaster and use a standalone backend to VACUUM in "mydb".
<secondary>general information</secondary>
</indexterm>
<para>
Beginning in <productname>PostgreSQL </productname> 8.1, there is a
separate optional server process called the <firstterm>autovacuum
daemon</firstterm>, whose purpose is to automate the execution of
Beginning in <productname>PostgreSQL</productname> 8.1, there is an
optional feature called <firstterm>autovacuum</firstterm>,
whose purpose is to automate the execution of
<command>VACUUM</command> and <command>ANALYZE </command> commands.
When enabled, the autovacuum daemon runs periodically and checks for
When enabled, autovacuum checks for
tables that have had a large number of inserted, updated or deleted
tuples. These checks use the row-level statistics collection facility;
therefore, the autovacuum daemon cannot be used unless <xref
therefore, autovacuum cannot be used unless <xref
linkend="guc-stats-start-collector"> and <xref
linkend="guc-stats-row-level"> are set to <literal>true</literal>. Also,
it's important to allow a slot for the autovacuum process when choosing
the value of <xref linkend="guc-superuser-reserved-connections">. In
the default configuration, autovacuuming is enabled and the related
linkend="guc-stats-row-level"> are set to <literal>true</literal>.
In the default configuration, autovacuuming is enabled and the related
configuration parameters are appropriately set.
</para>
<para>
The autovacuum daemon, when enabled, runs every <xref
linkend="guc-autovacuum-naptime"> seconds. On each run, it selects
one database to process and checks each table within that database.
Beginning in <productname>PostgreSQL</productname> 8.3, autovacuum has a
multi-process architecture: there is a daemon process, called the
<firstterm>autovacuum launcher</firstterm>, which is in charge of starting
an <firstterm>autovacuum worker</firstterm> process on each database every
<xref linkend="guc-autovacuum-naptime"> seconds.
</para>
<para>
There is a limit of <xref linkend="guc-autovacuum-max-workers"> worker
processes that may be running at at any time, so if the <command>VACUUM</>
and <command>ANALYZE</> work to do takes too long to run, the deadline may
be failed to meet for other databases. Also, if a particular database
takes long to process, more than one worker may be processing it
simultaneously. The workers are smart enough to avoid repeating work that
other workers have done, so this is normally not a problem. Note that the
number of running workers does not count towards the <xref
linkend="guc-max-connections"> nor the <xref
linkend="guc-superuser-reserved-connections"> limits.
</para>
<para>
On each run, the worker process checks each table within that database, and
<command>VACUUM</command> or <command>ANALYZE</command> commands are
issued as needed.
</para>
......@@ -581,6 +598,12 @@ analyze threshold = analyze base threshold + analyze scale factor * number of tu
</para>
</caution>
<para>
When multiple workers are running, the cost limit is "balanced" among all
the running workers, so that the total impact on the system is the same,
regardless of the number of workers actually running.
</para>
</sect2>
</sect1>
......
......@@ -13,7 +13,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.349 2007/03/14 18:48:55 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.350 2007/04/16 18:29:50 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -3504,6 +3504,9 @@ vacuum_delay_point(void)
VacuumCostBalance = 0;
/* update balance values for workers */
AutoVacuumUpdateDelay();
/* Might have gotten an interrupt while sleeping */
CHECK_FOR_INTERRUPTS();
}
......
......@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.40 2007/03/28 22:17:12 alvherre Exp $
* $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.41 2007/04/16 18:29:52 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -52,6 +52,7 @@
#include "utils/syscache.h"
static volatile sig_atomic_t got_SIGUSR1 = false;
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t avlauncher_shutdown_request = false;
......@@ -59,6 +60,7 @@ static volatile sig_atomic_t avlauncher_shutdown_request = false;
* GUC parameters
*/
bool autovacuum_start_daemon = false;
int autovacuum_max_workers;
int autovacuum_naptime;
int autovacuum_vac_thresh;
double autovacuum_vac_scale;
......@@ -69,7 +71,7 @@ int autovacuum_freeze_max_age;
int autovacuum_vac_cost_delay;
int autovacuum_vac_cost_limit;
/* Flag to tell if we are in the autovacuum daemon process */
/* Flags to tell if we are in an autovacuum process */
static bool am_autovacuum_launcher = false;
static bool am_autovacuum_worker = false;
......@@ -82,14 +84,22 @@ static int default_freeze_min_age;
/* Memory context for long-lived data */
static MemoryContext AutovacMemCxt;
/* struct to keep list of candidate databases for vacuum */
typedef struct autovac_dbase
/* struct to keep track of databases in launcher */
typedef struct avl_dbase
{
Oid ad_datid;
char *ad_name;
TransactionId ad_frozenxid;
PgStat_StatDBEntry *ad_entry;
} autovac_dbase;
Oid adl_datid; /* hash key -- must be first */
TimestampTz adl_next_worker;
int adl_score;
} avl_dbase;
/* struct to keep track of databases in worker */
typedef struct avw_dbase
{
Oid adw_datid;
char *adw_name;
TransactionId adw_frozenxid;
PgStat_StatDBEntry *adw_entry;
} avw_dbase;
/* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
typedef struct av_relation
......@@ -110,14 +120,73 @@ typedef struct autovac_table
int at_vacuum_cost_limit;
} autovac_table;
/*-------------
* This struct holds information about a single worker's whereabouts. We keep
* an array of these in shared memory, sized according to
* autovacuum_max_workers.
*
* wi_links entry into free list or running list
* wi_dboid OID of the database this worker is supposed to work on
* wi_tableoid OID of the table currently being vacuumed
* wi_workerpid PID of the running worker, 0 if not yet started
* wi_launchtime Time at which this worker was launched
* wi_cost_* Vacuum cost-based delay parameters current in this worker
*
* All fields are protected by AutovacuumLock, except for wi_tableoid which is
* protected by AutovacuumScheduleLock (which is read-only for everyone except
* that worker itself).
*-------------
*/
typedef struct WorkerInfoData
{
SHM_QUEUE wi_links;
Oid wi_dboid;
Oid wi_tableoid;
int wi_workerpid;
TimestampTz wi_launchtime;
int wi_cost_delay;
int wi_cost_limit;
int wi_cost_limit_base;
} WorkerInfoData;
typedef struct WorkerInfoData *WorkerInfo;
/*-------------
* The main autovacuum shmem struct. On shared memory we store this main
* struct and the array of WorkerInfo structs. This struct keeps:
*
* av_launcherpid the PID of the autovacuum launcher
* av_freeWorkers the WorkerInfo freelist
* av_runningWorkers the WorkerInfo non-free queue
* av_startingWorker pointer to WorkerInfo currently being started (cleared by
* the worker itself as soon as it's up and running)
* av_rebalance true when a worker determines that cost limits must be
* rebalanced
*
* This struct is protected by AutovacuumLock.
*-------------
*/
typedef struct
{
Oid process_db; /* OID of database to process */
int worker_pid; /* PID of the worker process, if any */
pid_t av_launcherpid;
SHMEM_OFFSET av_freeWorkers;
SHM_QUEUE av_runningWorkers;
SHMEM_OFFSET av_startingWorker;
bool av_rebalance;
} AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem;
/* the database list in the launcher, and the context that contains it */
static Dllist *DatabaseList = NULL;
static MemoryContext DatabaseListCxt = NULL;
/* Pointer to my own WorkerInfo, valid on each worker */
static WorkerInfo MyWorkerInfo = NULL;
/* PID of launcher, valid only in worker while shutting down */
int AutovacuumLauncherPid = 0;
#ifdef EXEC_BACKEND
static pid_t avlauncher_forkexec(void);
static pid_t avworker_forkexec(void);
......@@ -125,9 +194,16 @@ static pid_t avworker_forkexec(void);
NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
static void do_start_worker(void);
static Oid do_start_worker(void);
static uint64 launcher_determine_sleep(bool canlaunch, bool recursing);
static void launch_worker(TimestampTz now);
static List *get_database_list(void);
static void rebuild_database_list(Oid newdb);
static int db_comparator(const void *a, const void *b);
static void autovac_balance_cost(void);
static void do_autovacuum(void);
static List *autovac_get_database_list(void);
static void FreeWorkerInfo(int code, Datum arg);
static void relation_check_autovac(Oid relid, Form_pg_class classForm,
Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
......@@ -147,6 +223,7 @@ static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
PgStat_StatDBEntry *dbentry);
static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
static void avl_sighup_handler(SIGNAL_ARGS);
static void avl_sigusr1_handler(SIGNAL_ARGS);
static void avlauncher_shutdown(SIGNAL_ARGS);
static void avl_quickdie(SIGNAL_ARGS);
......@@ -230,12 +307,34 @@ StartAutoVacLauncher(void)
/*
* Main loop for the autovacuum launcher process.
*
* The signalling between launcher and worker is as follows:
*
* When the worker has finished starting up, it stores its PID in wi_workerpid
* and sends a SIGUSR1 signal to the launcher. The launcher then knows that
* the postmaster is ready to start a new worker. We do it this way because
* otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
* yet processed the last one, in which case the second signal would be lost.
* This is only useful when two workers need to be started close to one
* another, which should be rare but it's possible.
*
* When a worker exits, it resets the WorkerInfo struct and puts it back into
* the free list. If there is no free worker slot, it will also signal the
* launcher, which then wakes up and can launch a new worker if it needs to.
* Note that we only need to do it when there's no free worker slot, because
* otherwise there is no need -- the launcher would be awakened normally per
* schedule.
*
* There is a potential problem if, for some reason, a worker starts and is not
* able to bootstrap itself correctly. To prevent this situation from starving
* the whole system, the launcher checks the launch time of the "starting
* worker". If it's too old (older than autovacuum_naptime seconds), it resets
* the worker entry and puts it back into the free list.
*/
NON_EXEC_STATIC void
AutoVacLauncherMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
MemoryContext avlauncher_cxt;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
......@@ -264,9 +363,6 @@ AutoVacLauncherMain(int argc, char *argv[])
* Set up signal handlers. Since this is an auxiliary process, it has
* particular signal requirements -- no deadlock checker or sinval
* catchup, for example.
*
* XXX It may be a good idea to receive signals when an avworker process
* finishes.
*/
pqsignal(SIGHUP, avl_sighup_handler);
......@@ -276,7 +372,7 @@ AutoVacLauncherMain(int argc, char *argv[])
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, SIG_IGN);
pqsignal(SIGUSR1, avl_sigusr1_handler);
/* We don't listen for async notifies */
pqsignal(SIGUSR2, SIG_IGN);
pqsignal(SIGFPE, FloatExceptionHandler);
......@@ -300,12 +396,12 @@ AutoVacLauncherMain(int argc, char *argv[])
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
avlauncher_cxt = AllocSetContextCreate(TopMemoryContext,
"Autovacuum Launcher",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContextSwitchTo(avlauncher_cxt);
AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
"Autovacuum Launcher",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContextSwitchTo(AutovacMemCxt);
/*
......@@ -336,11 +432,15 @@ AutoVacLauncherMain(int argc, char *argv[])
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
MemoryContextSwitchTo(avlauncher_cxt);
MemoryContextSwitchTo(AutovacMemCxt);
FlushErrorState();
/* Flush any leaked data in the top-level context */
MemoryContextResetAndDeleteChildren(avlauncher_cxt);
MemoryContextResetAndDeleteChildren(AutovacMemCxt);
/* don't leave dangling pointers to freed memory */
DatabaseListCxt = NULL;
DatabaseList = NULL;
/* Make sure pgstat also considers our stat data as gone */
pgstat_clear_snapshot();
......@@ -361,18 +461,32 @@ AutoVacLauncherMain(int argc, char *argv[])
ereport(LOG,
(errmsg("autovacuum launcher started")));
/* must unblock signals before calling rebuild_database_list */
PG_SETMASK(&UnBlockSig);
/* in emergency mode, just start a worker and go away */
if (!autovacuum_start_daemon)
{
do_start_worker();
proc_exit(0); /* done */
}
AutoVacuumShmem->av_launcherpid = MyProcPid;
/*
* take a nap before executing the first iteration, unless we were
* requested an emergency run.
* Create the initial database list. The invariant we want this list to
* keep is that it's ordered by decreasing next_time. As soon as an entry
* is updated to a higher time, it will be moved to the front (which is
* correct because the only operation is to add autovacuum_naptime to the
* entry, and time always increases).
*/
if (autovacuum_start_daemon)
pg_usleep(autovacuum_naptime * 1000000L);
rebuild_database_list(InvalidOid);
for (;;)
{
int worker_pid;
uint64 micros;
bool can_launch;
TimestampTz current_time = 0;
/*
* Emergency bailout if postmaster has died. This is to avoid the
......@@ -381,6 +495,13 @@ AutoVacLauncherMain(int argc, char *argv[])
if (!PostmasterIsAlive(true))
exit(1);
micros = launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers !=
INVALID_OFFSET, false);
/* Sleep for a while according to schedule */
pg_usleep(micros);
/* the normal shutdown case */
if (avlauncher_shutdown_request)
break;
......@@ -388,82 +509,455 @@ AutoVacLauncherMain(int argc, char *argv[])
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
/* rebalance in case the default cost parameters changed */
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
autovac_balance_cost();
LWLockRelease(AutovacuumLock);
/* rebuild the list in case the naptime changed */
rebuild_database_list(InvalidOid);
}
/* a worker started up or finished */
if (got_SIGUSR1)
{
got_SIGUSR1 = false;
/* rebalance cost limits, if needed */
if (AutoVacuumShmem->av_rebalance)
{
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
AutoVacuumShmem->av_rebalance = false;
autovac_balance_cost();
LWLockRelease(AutovacuumLock);
}
}
/*
* if there's a worker already running, sleep until it
* disappears.
* There are some conditions that we need to check before trying to
* start a launcher. First, we need to make sure that there is a
* launcher slot available. Second, we need to make sure that no other
* worker is still starting up.
*/
LWLockAcquire(AutovacuumLock, LW_SHARED);
worker_pid = AutoVacuumShmem->worker_pid;
LWLockRelease(AutovacuumLock);
if (worker_pid != 0)
{
PGPROC *proc = BackendPidGetProc(worker_pid);
can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET);
if (proc != NULL && proc->isAutovacuum)
goto sleep;
if (can_launch && AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
{
long secs;
int usecs;
WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
if (current_time == 0)
current_time = GetCurrentTimestamp();
/*
* We can't launch another worker when another one is still
* starting up, so just sleep for a bit more; that worker will wake
* us up again as soon as it's ready. We will only wait
* autovacuum_naptime seconds for this to happen however. Note
* that failure to connect to a particular database is not a
* problem here, because the worker removes itself from the
* startingWorker pointer before trying to connect; only low-level
* problems, like fork() failure, can get us here.
*/
TimestampDifference(worker->wi_launchtime, current_time,
&secs, &usecs);
/* ignore microseconds, as they cannot make any difference */
if (secs > autovacuum_naptime)
{
LWLockRelease(AutovacuumLock);
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
/*
* No other process can put a worker in starting mode, so if
* startingWorker is still INVALID after exchanging our lock,
* we assume it's the same one we saw above (so we don't
* recheck the launch time).
*/
if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
{
worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
worker->wi_dboid = InvalidOid;
worker->wi_tableoid = InvalidOid;
worker->wi_workerpid = 0;
worker->wi_launchtime = 0;
worker->wi_links.next = AutoVacuumShmem->av_freeWorkers;
AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker);
AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
}
}
else
{
/*
* if the worker is not really running (or it's a process
* that's not an autovacuum worker), remove the PID from shmem.
* This should not happen, because either the worker exits
* cleanly, in which case it'll remove the PID, or it dies, in
* which case postmaster will cause a system reset cycle.
* maybe the postmaster neglected this start signal --
* resend it. Note: the constraints in
* launcher_determine_sleep keep us from delivering signals too
* quickly (at most once every 100ms).
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
worker_pid = 0;
LWLockRelease(AutovacuumLock);
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
can_launch = false;
}
}
LWLockRelease(AutovacuumLock); /* either shared or exclusive */
do_start_worker();
if (can_launch)
{
Dlelem *elem;
sleep:
/*
* in emergency mode, exit immediately so that the postmaster can
* request another run right away if needed.
*
* XXX -- maybe it would be better to handle this inside the launcher
* itself.
*/
if (!autovacuum_start_daemon)
break;
elem = DLGetTail(DatabaseList);
/* have pgstat read the file again next time */
pgstat_clear_snapshot();
if (current_time == 0)
current_time = GetCurrentTimestamp();
if (elem != NULL)
{
avl_dbase *avdb = DLE_VAL(elem);
long secs;
int usecs;
/* now sleep until the next autovac iteration */
pg_usleep(autovacuum_naptime * 1000000L);
TimestampDifference(current_time, avdb->adl_next_worker, &secs, &usecs);
/* do we have to start a worker? */
if (secs <= 0 && usecs <= 0)
launch_worker(current_time);
}
else
{
/*
* Special case when the list is empty: start a worker right
* away. This covers the initial case, when no database is in
* pgstats (thus the list is empty). Note that the constraints
* in launcher_determine_sleep keep us from starting workers
* too quickly (at most once every autovacuum_naptime when the
* list is empty).
*/
launch_worker(current_time);
}
}
}
/* Normal exit from the autovac launcher is here */
ereport(LOG,
(errmsg("autovacuum launcher shutting down")));
AutoVacuumShmem->av_launcherpid = 0;
proc_exit(0); /* done */
}
/*
* Determine the time to sleep, in microseconds, based on the database list.
*
* The "canlaunch" parameter indicates whether we can start a worker right now,
* for example due to the workers being all busy.
*/
static uint64
launcher_determine_sleep(bool canlaunch, bool recursing)
{
long secs;
int usecs;
Dlelem *elem;
/*
* We sleep until the next scheduled vacuum. We trust that when the
* database list was built, care was taken so that no entries have times in
* the past; if the first entry has too close a next_worker value, or a
* time in the past, we will sleep a small nominal time.
*/
if (!canlaunch)
{
secs = autovacuum_naptime;
usecs = 0;
}
else if ((elem = DLGetTail(DatabaseList)) != NULL)
{
avl_dbase *avdb = DLE_VAL(elem);
TimestampTz current_time = GetCurrentTimestamp();
TimestampTz next_wakeup;
next_wakeup = avdb->adl_next_worker;
TimestampDifference(current_time, next_wakeup, &secs, &usecs);
}
else
{
/* list is empty, sleep for whole autovacuum_naptime seconds */
secs = autovacuum_naptime;
usecs = 0;
}
/*
* If the result is exactly zero, it means a database had an entry with
* time in the past. Rebuild the list so that the databases are evenly
* distributed again, and recalculate the time to sleep. This can happen
* if there are more tables needing vacuum than workers, and they all take
* longer to vacuum than autovacuum_naptime.
*
* We only recurse once. rebuild_database_list should always return times
* in the future, but it seems best not to trust too much on that.
*/
if (secs == 0L && usecs == 0 && !recursing)
{
rebuild_database_list(InvalidOid);
return launcher_determine_sleep(canlaunch, true);
}
/* 100ms is the smallest time we'll allow the launcher to sleep */
if (secs <= 0L && usecs <= 100000)
{
secs = 0L;
usecs = 100000; /* 100 ms */
}
return secs * 1000000 + usecs;
}
/*
* Build an updated DatabaseList. It must only contain databases that appear
* in pgstats, and must be sorted by next_worker from highest to lowest,
* distributed regularly across the next autovacuum_naptime interval.
*
* Receives the Oid of the database that made this list be generated (we call
* this the "new" database, because when the database was already present on
* the list, we expect that this function is not called at all). The
* preexisting list, if any, will be used to preserve the order of the
* databases in the autovacuum_naptime period. The new database is put at the
* end of the interval. The actual values are not saved, which should not be
* much of a problem.
*/
static void
rebuild_database_list(Oid newdb)
{
List *dblist;
ListCell *cell;
MemoryContext newcxt;
MemoryContext oldcxt;
MemoryContext tmpcxt;
HASHCTL hctl;
int score;
int nelems;
HTAB *dbhash;
/* use fresh stats */
pgstat_clear_snapshot();
newcxt = AllocSetContextCreate(AutovacMemCxt,
"AV dblist",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
tmpcxt = AllocSetContextCreate(newcxt,
"tmp AV dblist",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldcxt = MemoryContextSwitchTo(tmpcxt);
/*
* Implementing this is not as simple as it sounds, because we need to put
* the new database at the end of the list; next the databases that were
* already on the list, and finally (at the tail of the list) all the other
* databases that are not on the existing list.
*
* To do this, we build an empty hash table of scored databases. We will
* start with the lowest score (zero) for the new database, then increasing
* scores for the databases in the existing list, in order, and lastly
* increasing scores for all databases gotten via get_database_list() that
* are not already on the hash.
*
* Then we will put all the hash elements into an array, sort the array by
* score, and finally put the array elements into the new doubly linked
* list.
*/
hctl.keysize = sizeof(Oid);
hctl.entrysize = sizeof(avl_dbase);
hctl.hash = oid_hash;
hctl.hcxt = tmpcxt;
dbhash = hash_create("db hash", 20, &hctl, /* magic number here FIXME */
HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
/* start by inserting the new database */
score = 0;
if (OidIsValid(newdb))
{
avl_dbase *db;
PgStat_StatDBEntry *entry;
/* only consider this database if it has a pgstat entry */
entry = pgstat_fetch_stat_dbentry(newdb);
if (entry != NULL)
{
/* we assume it isn't found because the hash was just created */
db = hash_search(dbhash, &newdb, HASH_ENTER, NULL);
/* hash_search already filled in the key */
db->adl_score = score++;
/* next_worker is filled in later */
}
}
/* Now insert the databases from the existing list */
if (DatabaseList != NULL)
{
Dlelem *elem;
elem = DLGetHead(DatabaseList);
while (elem != NULL)
{
avl_dbase *avdb = DLE_VAL(elem);
avl_dbase *db;
bool found;
PgStat_StatDBEntry *entry;
elem = DLGetSucc(elem);
/*
* skip databases with no stat entries -- in particular, this
* gets rid of dropped databases
*/
entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
if (entry == NULL)
continue;
db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found);
if (!found)
{
/* hash_search already filled in the key */
db->adl_score = score++;
/* next_worker is filled in later */
}
}
}
/* finally, insert all qualifying databases not previously inserted */
dblist = get_database_list();
foreach(cell, dblist)
{
avw_dbase *avdb = lfirst(cell);
avl_dbase *db;
bool found;
PgStat_StatDBEntry *entry;
/* only consider databases with a pgstat entry */
entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
if (entry == NULL)
continue;
db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found);
/* only update the score if the database was not already on the hash */
if (!found)
{
/* hash_search already filled in the key */
db->adl_score = score++;
/* next_worker is filled in later */
}
}
nelems = score;
/* from here on, the allocated memory belongs to the new list */
MemoryContextSwitchTo(newcxt);
DatabaseList = DLNewList();
if (nelems > 0)
{
TimestampTz current_time;
int millis_increment;
avl_dbase *dbary;
avl_dbase *db;
HASH_SEQ_STATUS seq;
int i;
/* put all the hash elements into an array */
dbary = palloc(nelems * sizeof(avl_dbase));
i = 0;
hash_seq_init(&seq, dbhash);
while ((db = hash_seq_search(&seq)) != NULL)
memcpy(&(dbary[i++]), db, sizeof(avl_dbase));
/* sort the array */
qsort(dbary, nelems, sizeof(avl_dbase), db_comparator);
/* this is the time interval between databases in the schedule */
millis_increment = 1000.0 * autovacuum_naptime / nelems;
current_time = GetCurrentTimestamp();
/*
* move the elements from the array into the dllist, setting the
* next_worker while walking the array
*/
for (i = 0; i < nelems; i++)
{
avl_dbase *db = &(dbary[i]);
Dlelem *elem;
current_time = TimestampTzPlusMilliseconds(current_time,
millis_increment);
db->adl_next_worker = current_time;
elem = DLNewElem(db);
/* later elements should go closer to the head of the list */
DLAddHead(DatabaseList, elem);
}
}
/* all done, clean up memory */
if (DatabaseListCxt != NULL)
MemoryContextDelete(DatabaseListCxt);
MemoryContextDelete(tmpcxt);
DatabaseListCxt = newcxt;
MemoryContextSwitchTo(oldcxt);
}
/* qsort comparator for avl_dbase, using adl_score */
static int
db_comparator(const void *a, const void *b)
{
if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score)
return 0;
else
return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1;
}
/*
* do_start_worker
*
* Bare-bones procedure for starting an autovacuum worker from the launcher.
* It determines what database to work on, sets up shared memory stuff and
* signals postmaster to start the worker.
* signals postmaster to start the worker. It fails gracefully if invoked when
* autovacuum_workers are already active.
*
* Return value is the OID of the database that the worker is going to process,
* or InvalidOid if no worker was actually started.
*/
static void
static Oid
do_start_worker(void)
{
List *dblist;
bool for_xid_wrap;
autovac_dbase *db;
ListCell *cell;
ListCell *cell;
TransactionId xidForceLimit;
bool for_xid_wrap;
avw_dbase *avdb;
TimestampTz current_time;
bool skipit = false;
/* return quickly when there are no free workers */
LWLockAcquire(AutovacuumLock, LW_SHARED);
if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
{
LWLockRelease(AutovacuumLock);
return InvalidOid;
}
LWLockRelease(AutovacuumLock);
/* use fresh stats */
pgstat_clear_snapshot();
/* Get a list of databases */
dblist = autovac_get_database_list();
dblist = get_database_list();
/*
* Determine the oldest datfrozenxid/relfrozenxid that we will allow
......@@ -495,21 +989,23 @@ do_start_worker(void)
* isn't clear how to construct a metric that measures that and not cause
* starvation for less busy databases.
*/
db = NULL;
avdb = NULL;
for_xid_wrap = false;
current_time = GetCurrentTimestamp();
foreach(cell, dblist)
{
autovac_dbase *tmp = lfirst(cell);
avw_dbase *tmp = lfirst(cell);
Dlelem *elem;
/* Find pgstat entry if any */
tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
/* Check to see if this one is at risk of wraparound */
if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit))
{
if (db == NULL ||
TransactionIdPrecedes(tmp->ad_frozenxid, db->ad_frozenxid))
db = tmp;
if (avdb == NULL ||
TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid))
avdb = tmp;
for_xid_wrap = true;
continue;
}
......@@ -520,26 +1016,156 @@ do_start_worker(void)
* Otherwise, skip a database with no pgstat entry; it means it
* hasn't seen any activity.
*/
if (!tmp->ad_entry)
if (!tmp->adw_entry)
continue;
/*
* Also, skip a database that appears on the database list as having
* been processed recently (less than autovacuum_naptime seconds ago).
* We do this so that we don't select a database which we just
* selected, but that pgstat hasn't gotten around to updating the last
* autovacuum time yet.
*/
skipit = false;
elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
while (elem != NULL)
{
avl_dbase *dbp = DLE_VAL(elem);
if (dbp->adl_datid == tmp->adw_datid)
{
TimestampTz curr_plus_naptime;
TimestampTz next = dbp->adl_next_worker;
curr_plus_naptime =
TimestampTzPlusMilliseconds(current_time,
autovacuum_naptime * 1000);
/*
* What we want here if to skip if next_worker falls between
* the current time and the current time plus naptime.
*/
if (timestamp_cmp_internal(current_time, next) > 0)
skipit = false;
else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
skipit = false;
else
skipit = true;
break;
}
elem = DLGetPred(elem);
}
if (skipit)
continue;
/*
* Remember the db with oldest autovac time. (If we are here,
* both tmp->entry and db->entry must be non-null.)
*/
if (db == NULL ||
tmp->ad_entry->last_autovac_time < db->ad_entry->last_autovac_time)
db = tmp;
if (avdb == NULL ||
tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
avdb = tmp;
}
/* Found a database -- process it */
if (db != NULL)
if (avdb != NULL)
{
WorkerInfo worker;
SHMEM_OFFSET sworker;
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
AutoVacuumShmem->process_db = db->ad_datid;
/*
* Get a worker entry from the freelist. We checked above, so there
* really should be a free slot -- complain very loudly if there isn't.
*/
sworker = AutoVacuumShmem->av_freeWorkers;
if (sworker == INVALID_OFFSET)
elog(FATAL, "no free worker found");
worker = (WorkerInfo) MAKE_PTR(sworker);
AutoVacuumShmem->av_freeWorkers = worker->wi_links.next;
worker->wi_dboid = avdb->adw_datid;
worker->wi_workerpid = 0;
worker->wi_launchtime = GetCurrentTimestamp();
AutoVacuumShmem->av_startingWorker = sworker;
LWLockRelease(AutovacuumLock);
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
return avdb->adw_datid;
}
else if (skipit)
{
/*
* If we skipped all databases on the list, rebuild it, because it
* probably contains a dropped database.
*/
rebuild_database_list(InvalidOid);
}
return InvalidOid;
}
/*
* launch_worker
*
* Wrapper for starting a worker from the launcher. Besides actually starting
* it, update the database list to reflect the next time that another one will
* need to be started on the selected database. The actual database choice is
* left to do_start_worker.
*
* This routine is also expected to insert an entry into the database list if
* the selected database was previously absent from the list. It returns the
* new database list.
*/
static void
launch_worker(TimestampTz now)
{
Oid dbid;
Dlelem *elem;
dbid = do_start_worker();
if (OidIsValid(dbid))
{
/*
* Walk the database list and update the corresponding entry. If the
* database is not on the list, we'll recreate the list.
*/
elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList);
while (elem != NULL)
{
avl_dbase *avdb = DLE_VAL(elem);
if (avdb->adl_datid == dbid)
{
/*
* add autovacuum_naptime seconds to the current time, and use
* that as the new "next_worker" field for this database.
*/
avdb->adl_next_worker =
TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
DLMoveToFront(elem);
break;
}
elem = DLGetSucc(elem);
}
/*
* If the database was not present in the database list, we rebuild the
* list. It's possible that the database does not get into the list
* anyway, for example if it's a database that doesn't have a pgstat
* entry, but this is not a problem because we don't want to schedule
* workers regularly into those in any case.
*/
if (elem == NULL)
rebuild_database_list(dbid);
}
}
......@@ -550,6 +1176,13 @@ avl_sighup_handler(SIGNAL_ARGS)
got_SIGHUP = true;
}
/* SIGUSR1: a worker is up and running, or just finished */
static void
avl_sigusr1_handler(SIGNAL_ARGS)
{
got_SIGUSR1 = true;
}
static void
avlauncher_shutdown(SIGNAL_ARGS)
{
......@@ -665,7 +1298,7 @@ NON_EXEC_STATIC void
AutoVacWorkerMain(int argc, char *argv[])
{
sigjmp_buf local_sigjmp_buf;
Oid dbid;
Oid dbid = InvalidOid;
/* we are a postmaster subprocess now */
IsUnderPostmaster = true;
......@@ -763,18 +1396,35 @@ AutoVacWorkerMain(int argc, char *argv[])
SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
/*
* Get the database Id we're going to work on, and announce our PID
* in the shared memory area. We remove the database OID immediately
* from the shared memory area.
* Force statement_timeout to zero to avoid a timeout setting from
* preventing regular maintenance from being executed.
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
dbid = AutoVacuumShmem->process_db;
AutoVacuumShmem->process_db = InvalidOid;
AutoVacuumShmem->worker_pid = MyProcPid;
/*
* Get the info about the database we're going to work on.
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
dbid = MyWorkerInfo->wi_dboid;
MyWorkerInfo->wi_workerpid = MyProcPid;
/* insert into the running list */
SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers,
&MyWorkerInfo->wi_links);
/*
* remove from the "starting" pointer, so that the launcher can start a new
* worker if required
*/
AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
LWLockRelease(AutovacuumLock);
on_shmem_exit(FreeWorkerInfo, 0);
/* wake up the launcher */
if (AutoVacuumShmem->av_launcherpid != 0)
kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
if (OidIsValid(dbid))
{
char *dbname;
......@@ -803,7 +1453,7 @@ AutoVacWorkerMain(int argc, char *argv[])
/* Create the memory context where cross-transaction state is stored */
AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
"Autovacuum context",
"AV worker",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
......@@ -814,25 +1464,152 @@ AutoVacWorkerMain(int argc, char *argv[])
}
/*
* Now remove our PID from shared memory, so that the launcher can start
* another worker as soon as appropriate.
* FIXME -- we need to notify the launcher when we are gone. But this
* should be done after our PGPROC is released, in ProcKill.
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
AutoVacuumShmem->worker_pid = 0;
LWLockRelease(AutovacuumLock);
/* All done, go away */
proc_exit(0);
}
/*
* autovac_get_database_list
* Return a WorkerInfo to the free list */
static void
FreeWorkerInfo(int code, Datum arg)
{
if (MyWorkerInfo != NULL)
{
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
/*
* If this worker shuts down when there is no free worker slot, wake
* the launcher up so that he can launch a new worker immediately if
* required. We only save the launcher's PID in local memory here --
* the actual signal will be sent when the PGPROC is recycled, because
* that is when the new worker can actually be launched.
*
* We somewhat ignore the risk that the launcher changes its PID
* between we reading it and the actual kill; we expect ProcKill to be
* called shortly after us, and we assume that PIDs are not reused too
* quickly after a process exits.
*/
if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
AutovacuumLauncherPid = AutoVacuumShmem->av_launcherpid;
SHMQueueDelete(&MyWorkerInfo->wi_links);
MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers;
MyWorkerInfo->wi_dboid = InvalidOid;
MyWorkerInfo->wi_tableoid = InvalidOid;
MyWorkerInfo->wi_workerpid = 0;
MyWorkerInfo->wi_launchtime = 0;
MyWorkerInfo->wi_cost_delay = 0;
MyWorkerInfo->wi_cost_limit = 0;
MyWorkerInfo->wi_cost_limit_base = 0;
AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo);
/* not mine anymore */
MyWorkerInfo = NULL;
/*
* now that we're inactive, cause a rebalancing of the surviving
* workers
*/
AutoVacuumShmem->av_rebalance = true;
LWLockRelease(AutovacuumLock);
}
}
/*
* Update the cost-based delay parameters, so that multiple workers consume
* each a fraction of the total available I/O.
*/
void
AutoVacuumUpdateDelay(void)
{
if (MyWorkerInfo)
{
VacuumCostDelay = MyWorkerInfo->wi_cost_delay;
VacuumCostLimit = MyWorkerInfo->wi_cost_limit;
}
}
/*
* autovac_balance_cost
* Recalculate the cost limit setting for each active workers.
*
* Caller must hold the AutovacuumLock in exclusive mode.
*/
static void
autovac_balance_cost(void)
{
WorkerInfo worker;
int vac_cost_limit = (autovacuum_vac_cost_limit >= 0 ?
autovacuum_vac_cost_limit : VacuumCostLimit);
int vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ?
autovacuum_vac_cost_delay : VacuumCostDelay);
double cost_total;
double cost_avail;
/* not set? nothing to do */
if (vac_cost_limit <= 0 || vac_cost_delay <= 0)
return;
/* caculate the total base cost limit of active workers */
cost_total = 0.0;
worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
&AutoVacuumShmem->av_runningWorkers,
offsetof(WorkerInfoData, wi_links));
while (worker)
{
if (worker->wi_workerpid != 0 &&
worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
cost_total +=
(double) worker->wi_cost_limit_base / worker->wi_cost_delay;
worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
&worker->wi_links,
offsetof(WorkerInfoData, wi_links));
}
/* there are no cost limits -- nothing to do */
if (cost_total <= 0)
return;
/*
* Adjust each cost limit of active workers to balance the total of
* cost limit to autovacuum_vacuum_cost_limit.
*/
cost_avail = (double) vac_cost_limit / vac_cost_delay;
worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
&AutoVacuumShmem->av_runningWorkers,
offsetof(WorkerInfoData, wi_links));
while (worker)
{
if (worker->wi_workerpid != 0 &&
worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
{
int limit = (int)
(cost_avail * worker->wi_cost_limit_base / cost_total);
worker->wi_cost_limit = Min(limit, worker->wi_cost_limit_base);
elog(DEBUG2, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)",
worker->wi_workerpid, worker->wi_dboid,
worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay);
}
worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
&worker->wi_links,
offsetof(WorkerInfoData, wi_links));
}
}
/*
* get_database_list
*
* Return a list of all databases. Note we cannot use pg_database,
* because we aren't connected; we use the flat database file.
*/
static List *
autovac_get_database_list(void)
get_database_list(void)
{
char *filename;
List *dblist = NIL;
......@@ -852,15 +1629,15 @@ autovac_get_database_list(void)
while (read_pg_database_line(db_file, thisname, &db_id,
&db_tablespace, &db_frozenxid))
{
autovac_dbase *avdb;
avw_dbase *avdb;
avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase));
avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
avdb->ad_datid = db_id;
avdb->ad_name = pstrdup(thisname);
avdb->ad_frozenxid = db_frozenxid;
avdb->adw_datid = db_id;
avdb->adw_name = pstrdup(thisname);
avdb->adw_frozenxid = db_frozenxid;
/* this gets set later: */
avdb->ad_entry = NULL;
avdb->adw_entry = NULL;
dblist = lappend(dblist, avdb);
}
......@@ -1008,12 +1785,12 @@ do_autovacuum(void)
* Add to the list of tables to vacuum, the OIDs of the tables that
* correspond to the saved OIDs of toast tables needing vacuum.
*/
foreach (cell, toast_oids)
foreach(cell, toast_oids)
{
Oid toastoid = lfirst_oid(cell);
ListCell *cell2;
foreach (cell2, table_toast_list)
foreach(cell2, table_toast_list)
{
av_relation *ar = lfirst(cell2);
......@@ -1038,9 +1815,55 @@ do_autovacuum(void)
Oid relid = lfirst_oid(cell);
autovac_table *tab;
char *relname;
WorkerInfo worker;
bool skipit;
CHECK_FOR_INTERRUPTS();
/*
* hold schedule lock from here until we're sure that this table
* still needs vacuuming. We also need the AutovacuumLock to walk
* the worker array, but we'll let go of that one quickly.
*/
LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
LWLockAcquire(AutovacuumLock, LW_SHARED);
/*
* Check whether the table is being vacuumed concurrently by another
* worker.
*/
skipit = false;
worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
&AutoVacuumShmem->av_runningWorkers,
offsetof(WorkerInfoData, wi_links));
while (worker)
{
/* ignore myself */
if (worker == MyWorkerInfo)
goto next_worker;
/* ignore workers in other databases */
if (worker->wi_dboid != MyDatabaseId)
goto next_worker;
if (worker->wi_tableoid == relid)
{
skipit = true;
break;
}
next_worker:
worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
&worker->wi_links,
offsetof(WorkerInfoData, wi_links));
}
LWLockRelease(AutovacuumLock);
if (skipit)
{
LWLockRelease(AutovacuumScheduleLock);
continue;
}
/*
* Check whether pgstat data still says we need to vacuum this table.
* It could have changed if something else processed the table while we
......@@ -1053,11 +1876,18 @@ do_autovacuum(void)
if (tab == NULL)
{
/* someone else vacuumed the table */
LWLockRelease(AutovacuumScheduleLock);
continue;
}
/* Ok, good to go! */
/* Set the vacuum cost parameters for this table */
/*
* Ok, good to go. Store the table in shared memory before releasing
* the lock so that other workers don't vacuum it concurrently.
*/
MyWorkerInfo->wi_tableoid = relid;
LWLockRelease(AutovacuumScheduleLock);
/* Set the initial vacuum cost parameters for this table */
VacuumCostDelay = tab->at_vacuum_cost_delay;
VacuumCostLimit = tab->at_vacuum_cost_limit;
......@@ -1067,6 +1897,18 @@ do_autovacuum(void)
(tab->at_doanalyze ? " ANALYZE" : ""),
relname);
/*
* Advertise my cost delay parameters for the balancing algorithm, and
* do a balance
*/
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
MyWorkerInfo->wi_cost_delay = tab->at_vacuum_cost_delay;
MyWorkerInfo->wi_cost_limit = tab->at_vacuum_cost_limit;
MyWorkerInfo->wi_cost_limit_base = tab->at_vacuum_cost_limit;
autovac_balance_cost();
LWLockRelease(AutovacuumLock);
/* have at it */
autovacuum_do_vac_analyze(tab->at_relid,
tab->at_dovacuum,
tab->at_doanalyze,
......@@ -1211,7 +2053,7 @@ table_recheck_autovac(Oid relid)
PgStat_StatDBEntry *shared;
PgStat_StatDBEntry *dbentry;
/* We need fresh pgstat data for this */
/* use fresh stats */
pgstat_clear_snapshot();
shared = pgstat_fetch_stat_dbentry(InvalidOid);
......@@ -1219,8 +2061,8 @@ table_recheck_autovac(Oid relid)
/* fetch the relation's relcache entry */
classTup = SearchSysCacheCopy(RELOID,
ObjectIdGetDatum(relid),
0, 0, 0);
ObjectIdGetDatum(relid),
0, 0, 0);
if (!HeapTupleIsValid(classTup))
return NULL;
classForm = (Form_pg_class) GETSTRUCT(classTup);
......@@ -1630,7 +2472,16 @@ IsAutoVacuumWorkerProcess(void)
Size
AutoVacuumShmemSize(void)
{
return sizeof(AutoVacuumShmemStruct);
Size size;
/*
* Need the fixed struct and the array of WorkerInfoData.
*/
size = sizeof(AutoVacuumShmemStruct);
size = MAXALIGN(size);
size = add_size(size, mul_size(autovacuum_max_workers,
sizeof(WorkerInfoData)));
return size;
}
/*
......@@ -1650,8 +2501,29 @@ AutoVacuumShmemInit(void)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("not enough shared memory for autovacuum")));
if (found)
return; /* already initialized */
MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
if (!IsUnderPostmaster)
{
WorkerInfo worker;
int i;
Assert(!found);
AutoVacuumShmem->av_launcherpid = 0;
AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET;
SHMQueueInit(&AutoVacuumShmem->av_runningWorkers);
AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
worker = (WorkerInfo) ((char *) AutoVacuumShmem +
MAXALIGN(sizeof(AutoVacuumShmemStruct)));
/* initialize the WorkerInfo free list */
for (i = 0; i < autovacuum_max_workers; i++)
{
worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers;
AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]);
}
}
else
Assert(found);
}
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.187 2007/04/03 16:34:36 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.188 2007/04/16 18:29:53 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -96,7 +96,7 @@ ProcGlobalShmemSize(void)
size = add_size(size, sizeof(PROC_HDR));
/* AuxiliaryProcs */
size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
/* MyProcs */
/* MyProcs, including autovacuum */
size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
/* ProcStructLock */
size = add_size(size, sizeof(slock_t));
......@@ -110,7 +110,10 @@ ProcGlobalShmemSize(void)
int
ProcGlobalSemas(void)
{
/* We need a sema per backend, plus one for each auxiliary process. */
/*
* We need a sema per backend (including autovacuum), plus one for each
* auxiliary process.
*/
return MaxBackends + NUM_AUXILIARY_PROCS;
}
......@@ -127,8 +130,8 @@ ProcGlobalSemas(void)
* running out when trying to start another backend is a common failure.
* So, now we grab enough semaphores to support the desired max number
* of backends immediately at initialization --- if the sysadmin has set
* MaxBackends higher than his kernel will support, he'll find out sooner
* rather than later.
* MaxConnections or autovacuum_max_workers higher than his kernel will
* support, he'll find out sooner rather than later.
*
* Another reason for creating semaphores here is that the semaphore
* implementation typically requires us to create semaphores in the
......@@ -163,25 +166,39 @@ InitProcGlobal(void)
* Initialize the data structures.
*/
ProcGlobal->freeProcs = INVALID_OFFSET;
ProcGlobal->autovacFreeProcs = INVALID_OFFSET;
ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
/*
* Pre-create the PGPROC structures and create a semaphore for each.
*/
procs = (PGPROC *) ShmemAlloc(MaxBackends * sizeof(PGPROC));
procs = (PGPROC *) ShmemAlloc((MaxConnections) * sizeof(PGPROC));
if (!procs)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
MemSet(procs, 0, MaxBackends * sizeof(PGPROC));
for (i = 0; i < MaxBackends; i++)
MemSet(procs, 0, MaxConnections * sizeof(PGPROC));
for (i = 0; i < MaxConnections; i++)
{
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = ProcGlobal->freeProcs;
ProcGlobal->freeProcs = MAKE_OFFSET(&procs[i]);
}
procs = (PGPROC *) ShmemAlloc((autovacuum_max_workers) * sizeof(PGPROC));
if (!procs)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory")));
MemSet(procs, 0, autovacuum_max_workers * sizeof(PGPROC));
for (i = 0; i < autovacuum_max_workers; i++)
{
PGSemaphoreCreate(&(procs[i].sem));
procs[i].links.next = ProcGlobal->autovacFreeProcs;
ProcGlobal->autovacFreeProcs = MAKE_OFFSET(&procs[i]);
}
MemSet(AuxiliaryProcs, 0, NUM_AUXILIARY_PROCS * sizeof(PGPROC));
for (i = 0; i < NUM_AUXILIARY_PROCS; i++)
{
......@@ -226,12 +243,18 @@ InitProcess(void)
set_spins_per_delay(procglobal->spins_per_delay);
myOffset = procglobal->freeProcs;
if (IsAutoVacuumWorkerProcess())
myOffset = procglobal->autovacFreeProcs;
else
myOffset = procglobal->freeProcs;
if (myOffset != INVALID_OFFSET)
{
MyProc = (PGPROC *) MAKE_PTR(myOffset);
procglobal->freeProcs = MyProc->links.next;
if (IsAutoVacuumWorkerProcess())
procglobal->autovacFreeProcs = MyProc->links.next;
else
procglobal->freeProcs = MyProc->links.next;
SpinLockRelease(ProcStructLock);
}
else
......@@ -239,7 +262,8 @@ InitProcess(void)
/*
* If we reach here, all the PGPROCs are in use. This is one of the
* possible places to detect "too many backends", so give the standard
* error message.
* error message. XXX do we need to give a different failure message
* in the autovacuum case?
*/
SpinLockRelease(ProcStructLock);
ereport(FATAL,
......@@ -571,8 +595,16 @@ ProcKill(int code, Datum arg)
SpinLockAcquire(ProcStructLock);
/* Return PGPROC structure (and semaphore) to freelist */
MyProc->links.next = procglobal->freeProcs;
procglobal->freeProcs = MAKE_OFFSET(MyProc);
if (IsAutoVacuumWorkerProcess())
{
MyProc->links.next = procglobal->autovacFreeProcs;
procglobal->autovacFreeProcs = MAKE_OFFSET(MyProc);
}
else
{
MyProc->links.next = procglobal->freeProcs;
procglobal->freeProcs = MAKE_OFFSET(MyProc);
}
/* PGPROC struct isn't mine anymore */
MyProc = NULL;
......@@ -581,6 +613,10 @@ ProcKill(int code, Datum arg)
procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay);
SpinLockRelease(ProcStructLock);
/* wake autovac launcher if needed -- see comments in FreeWorkerInfo */
if (AutovacuumLauncherPid != 0)
kill(AutovacuumLauncherPid, SIGUSR1);
}
/*
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.100 2007/01/05 22:19:44 momjian Exp $
* $PostgreSQL: pgsql/src/backend/utils/init/globals.c,v 1.101 2007/04/16 18:29:54 alvherre Exp $
*
* NOTES
* Globals used all over the place should be declared here and not
......@@ -95,9 +95,14 @@ bool allowSystemTableMods = false;
int work_mem = 1024;
int maintenance_work_mem = 16384;
/* Primary determinants of sizes of shared-memory structures: */
/*
* Primary determinants of sizes of shared-memory structures. MaxBackends is
* MaxConnections + autovacuum_max_workers (it is computed by the GUC assign
* hook):
*/
int NBuffers = 1000;
int MaxBackends = 100;
int MaxConnections = 90;
int VacuumCostPageHit = 1; /* GUC parameters for vacuum */
int VacuumCostPageMiss = 10;
......
......@@ -10,7 +10,7 @@
* Written by Peter Eisentraut <peter_e@gmx.net>.
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.384 2007/04/12 06:53:47 neilc Exp $
* $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.385 2007/04/16 18:29:55 alvherre Exp $
*
*--------------------------------------------------------------------
*/
......@@ -163,6 +163,8 @@ static bool assign_tcp_keepalives_count(int newval, bool doit, GucSource source)
static const char *show_tcp_keepalives_idle(void);
static const char *show_tcp_keepalives_interval(void);
static const char *show_tcp_keepalives_count(void);
static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source);
static bool assign_maxconnections(int newval, bool doit, GucSource source);
/*
* GUC option variables that are exported from this module
......@@ -1149,16 +1151,19 @@ static struct config_int ConfigureNamesInt[] =
* number.
*
* MaxBackends is limited to INT_MAX/4 because some places compute
* 4*MaxBackends without any overflow check. Likewise we have to limit
* NBuffers to INT_MAX/2.
* 4*MaxBackends without any overflow check. This check is made on
* assign_maxconnections, since MaxBackends is computed as MaxConnections +
* autovacuum_max_workers.
*
* Likewise we have to limit NBuffers to INT_MAX/2.
*/
{
{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the maximum number of concurrent connections."),
NULL
},
&MaxBackends,
100, 1, INT_MAX / 4, NULL, NULL
&MaxConnections,
100, 1, INT_MAX / 4, assign_maxconnections, NULL
},
{
......@@ -1622,6 +1627,15 @@ static struct config_int ConfigureNamesInt[] =
&autovacuum_freeze_max_age,
200000000, 100000000, 2000000000, NULL, NULL
},
{
/* see max_connections */
{"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM,
gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."),
NULL
},
&autovacuum_max_workers,
3, 1, INT_MAX / 4, assign_autovacuum_max_workers, NULL
},
{
{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
......@@ -6692,5 +6706,32 @@ show_tcp_keepalives_count(void)
return nbuf;
}
static bool
assign_maxconnections(int newval, bool doit, GucSource source)
{
if (doit)
{
if (newval + autovacuum_max_workers > INT_MAX / 4)
return false;
MaxBackends = newval + autovacuum_max_workers;
}
return true;
}
static bool
assign_autovacuum_max_workers(int newval, bool doit, GucSource source)
{
if (doit)
{
if (newval + MaxConnections > INT_MAX / 4)
return false;
MaxBackends = newval + MaxConnections;
}
return true;
}
#include "guc-file.c"
......@@ -376,6 +376,7 @@
#autovacuum = on # enable autovacuum subprocess?
# 'on' requires stats_start_collector
# and stats_row_level to also be on
#autovacuum_max_workers = 3 # max # of autovacuum subprocesses
#autovacuum_naptime = 1min # time between autovacuum runs
#autovacuum_vacuum_threshold = 500 # min # of tuple updates before
# vacuum
......
......@@ -13,7 +13,7 @@
* Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/miscadmin.h,v 1.193 2007/03/01 14:52:04 petere Exp $
* $PostgreSQL: pgsql/src/include/miscadmin.h,v 1.194 2007/04/16 18:29:56 alvherre Exp $
*
* NOTES
* some of the information in this file should be moved to other files.
......@@ -129,6 +129,7 @@ extern DLLIMPORT char *DataDir;
extern DLLIMPORT int NBuffers;
extern int MaxBackends;
extern int MaxConnections;
extern DLLIMPORT int MyProcPid;
extern DLLIMPORT struct Port *MyProcPort;
......
......@@ -7,15 +7,18 @@
* Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/postmaster/autovacuum.h,v 1.8 2007/02/15 23:23:23 alvherre Exp $
* $PostgreSQL: pgsql/src/include/postmaster/autovacuum.h,v 1.9 2007/04/16 18:30:03 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef AUTOVACUUM_H
#define AUTOVACUUM_H
#include "storage/lock.h"
/* GUC variables */
extern bool autovacuum_start_daemon;
extern int autovacuum_max_workers;
extern int autovacuum_naptime;
extern int autovacuum_vac_thresh;
extern double autovacuum_vac_scale;
......@@ -25,6 +28,9 @@ extern int autovacuum_freeze_max_age;
extern int autovacuum_vac_cost_delay;
extern int autovacuum_vac_cost_limit;
/* autovacuum launcher PID, only valid when worker is shutting down */
extern int AutovacuumLauncherPid;
/* Status inquiry functions */
extern bool AutoVacuumingActive(void);
extern bool IsAutoVacuumLauncherProcess(void);
......@@ -35,6 +41,9 @@ extern void autovac_init(void);
extern int StartAutoVacLauncher(void);
extern int StartAutoVacWorker(void);
/* autovacuum cost-delay balancer */
extern void AutoVacuumUpdateDelay(void);
#ifdef EXEC_BACKEND
extern void AutoVacLauncherMain(int argc, char *argv[]);
extern void AutoVacWorkerMain(int argc, char *argv[]);
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.35 2007/04/03 16:34:36 tgl Exp $
* $PostgreSQL: pgsql/src/include/storage/lwlock.h,v 1.36 2007/04/16 18:30:04 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -61,6 +61,7 @@ typedef enum LWLockId
BtreeVacuumLock,
AddinShmemInitLock,
AutovacuumLock,
AutovacuumScheduleLock,
/* Individual lock IDs end here */
FirstBufMappingLock,
FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.97 2007/04/03 16:34:36 tgl Exp $
* $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.98 2007/04/16 18:30:04 alvherre Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -115,6 +115,8 @@ typedef struct PROC_HDR
{
/* Head of list of free PGPROC structures */
SHMEM_OFFSET freeProcs;
/* Head of list of autovacuum's free PGPROC structures */
SHMEM_OFFSET autovacFreeProcs;
/* Current shared estimate of appropriate spins_per_delay value */
int spins_per_delay;
} PROC_HDR;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment