Commit 40d964ec authored by Amit Kapila's avatar Amit Kapila

Allow vacuum command to process indexes in parallel.

This feature allows the vacuum to leverage multiple CPUs in order to
process indexes.  This enables us to perform index vacuuming and index
cleanup with background workers.  This adds a PARALLEL option to VACUUM
command where the user can specify the number of workers that can be used
to perform the command which is limited by the number of indexes on a
table.  Specifying zero as a number of workers will disable parallelism.
This option can't be used with the FULL option.

Each index is processed by at most one vacuum process.  Therefore parallel
vacuum can be used when the table has at least two indexes.

The parallel degree is either specified by the user or determined based on
the number of indexes that the table has, and further limited by
max_parallel_maintenance_workers.  The index can participate in parallel
vacuum iff it's size is greater than min_parallel_index_scan_size.

Author: Masahiko Sawada and Amit Kapila
Reviewed-by: Dilip Kumar, Amit Kapila, Robert Haas, Tomas Vondra,
Mahendra Singh and Sergei Kornilov
Tested-by: Mahendra Singh and Prabhat Sahu
Discussion:
https://postgr.es/m/CAD21AoDTPMgzSkV4E3SFo1CH_x50bf5PqZFQf4jmqjk-C03BWg@mail.gmail.com
https://postgr.es/m/CAA4eK1J-VoR9gzS5E75pcD-OH0mEyCdp8RihcwKrcuw7J-Q0+w@mail.gmail.com
parent 44f1fc8d
......@@ -2308,13 +2308,13 @@ include_dir 'conf.d'
<listitem>
<para>
Sets the maximum number of parallel workers that can be
started by a single utility command. Currently, the only
parallel utility command that supports the use of parallel
workers is <command>CREATE INDEX</command>, and only when
building a B-tree index. Parallel workers are taken from the
pool of processes established by <xref
linkend="guc-max-worker-processes"/>, limited by <xref
linkend="guc-max-parallel-workers"/>. Note that the requested
started by a single utility command. Currently, the parallel
utility commands that support the use of parallel workers are
<command>CREATE INDEX</command> only when building a B-tree index,
and <command>VACUUM</command> without <literal>FULL</literal>
option. Parallel workers are taken from the pool of processes
established by <xref linkend="guc-max-worker-processes"/>, limited
by <xref linkend="guc-max-parallel-workers"/>. Note that the requested
number of workers may not actually be available at run time.
If this occurs, the utility operation will run with fewer
workers than expected. The default value is 2. Setting this
......@@ -4915,7 +4915,9 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
for a parallel scan to be considered. Note that a parallel index scan
typically won't touch the entire index; it is the number of pages
which the planner believes will actually be touched by the scan which
is relevant.
is relevant. This parameter is also used to decide whether a
particular index can participate in a parallel vacuum. See
<xref linkend="sql-vacuum"/>.
If this value is specified without units, it is taken as blocks,
that is <symbol>BLCKSZ</symbol> bytes, typically 8kB.
The default is 512 kilobytes (<literal>512kB</literal>).
......
......@@ -34,6 +34,7 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
SKIP_LOCKED [ <replaceable class="parameter">boolean</replaceable> ]
INDEX_CLEANUP [ <replaceable class="parameter">boolean</replaceable> ]
TRUNCATE [ <replaceable class="parameter">boolean</replaceable> ]
PARALLEL <replaceable class="parameter">integer</replaceable>
<phrase>and <replaceable class="parameter">table_and_columns</replaceable> is:</phrase>
......@@ -75,10 +76,14 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
with normal reading and writing of the table, as an exclusive lock
is not obtained. However, extra space is not returned to the operating
system (in most cases); it's just kept available for re-use within the
same table. <command>VACUUM FULL</command> rewrites the entire contents
of the table into a new disk file with no extra space, allowing unused
space to be returned to the operating system. This form is much slower and
requires an exclusive lock on each table while it is being processed.
same table. It also allows us to leverage multiple CPUs in order to process
indexes. This feature is known as <firstterm>parallel vacuum</firstterm>.
To disable this feature, one can use <literal>PARALLEL</literal> option and
specify parallel workers as zero. <command>VACUUM FULL</command> rewrites
the entire contents of the table into a new disk file with no extra space,
allowing unused space to be returned to the operating system. This form is
much slower and requires an exclusive lock on each table while it is being
processed.
</para>
<para>
......@@ -223,6 +228,33 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
</listitem>
</varlistentry>
<varlistentry>
<term><literal>PARALLEL</literal></term>
<listitem>
<para>
Perform vacuum index and cleanup index phases of <command>VACUUM</command>
in parallel using <replaceable class="parameter">integer</replaceable>
background workers (for the detail of each vacuum phases, please
refer to <xref linkend="vacuum-phases"/>). If the
<literal>PARALLEL</literal> option is omitted, then
<command>VACUUM</command> decides the number of workers based on number
of indexes that support parallel vacuum operation on the relation which
is further limited by <xref linkend="guc-max-parallel-workers-maintenance"/>.
The index can participate in a parallel vacuum if and only if the size
of the index is more than <xref linkend="guc-min-parallel-index-scan-size"/>.
Please note that it is not guaranteed that the number of parallel workers
specified in <replaceable class="parameter">integer</replaceable> will
be used during execution. It is possible for a vacuum to run with fewer
workers than specified, or even with no workers at all. Only one worker
can be used per index. So parallel workers are launched only when there
are at least <literal>2</literal> indexes in the table. Workers for
vacuum launches before starting each phase and exit at the end of
the phase. These behaviors might change in a future release. This
option can't be used with the <literal>FULL</literal> option.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">boolean</replaceable></term>
<listitem>
......@@ -237,6 +269,15 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">integer</replaceable></term>
<listitem>
<para>
Specifies a non-negative integer value passed to the selected option.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">table_name</replaceable></term>
<listitem>
......@@ -316,11 +357,19 @@ VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ ANALYZE ] [ <replaceable class="paramet
more than a plain <command>VACUUM</command> would.
</para>
<para>
The <option>PARALLEL</option> option is used only for vacuum purpose.
Even if this option is specified with <option>ANALYZE</option> option
it does not affect <option>ANALYZE</option>.
</para>
<para>
<command>VACUUM</command> causes a substantial increase in I/O traffic,
which might cause poor performance for other active sessions. Therefore,
it is sometimes advisable to use the cost-based vacuum delay feature.
See <xref linkend="runtime-config-resource-vacuum-cost"/> for details.
it is sometimes advisable to use the cost-based vacuum delay feature. For
parallel vacuum, each worker sleeps proportional to the work done by that
worker. See <xref linkend="runtime-config-resource-vacuum-cost"/> for
details.
</para>
<para>
......
......@@ -22,6 +22,20 @@
* of index scans performed. So we don't use maintenance_work_mem memory for
* the TID array, just enough to hold as many heap tuples as fit on one page.
*
* Lazy vacuum supports parallel execution with parallel worker processes. In
* a parallel vacuum, we perform both index vacuum and index cleanup with
* parallel worker processes. Individual indexes are processed by one vacuum
* process. At the beginning of a lazy vacuum (at lazy_scan_heap) we prepare
* the parallel context and initialize the DSM segment that contains shared
* information as well as the memory space for storing dead tuples. When
* starting either index vacuum or index cleanup, we launch parallel worker
* processes. Once all indexes are processed the parallel worker processes
* exit. After that, the leader process re-initializes the parallel context
* so that it can use the same DSM for multiple passes of index vacuum and
* for performing index cleanup. For updating the index statistics, we need
* to update the system table and since updates are not allowed during
* parallel mode we update the index statistics after exiting from the
* parallel mode.
*
* Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
......@@ -36,25 +50,30 @@
#include <math.h>
#include "access/amapi.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
#include "access/htup_details.h"
#include "access/multixact.h"
#include "access/parallel.h"
#include "access/transam.h"
#include "access/visibilitymap.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/storage.h"
#include "commands/dbcommands.h"
#include "commands/progress.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "optimizer/paths.h"
#include "pgstat.h"
#include "portability/instr_time.h"
#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_rusage.h"
......@@ -110,6 +129,142 @@
*/
#define PREFETCH_SIZE ((BlockNumber) 32)
/*
* DSM keys for parallel vacuum. Unlike other parallel execution code, since
* we don't need to worry about DSM keys conflicting with plan_node_id we can
* use small integers.
*/
#define PARALLEL_VACUUM_KEY_SHARED 1
#define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2
#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
/*
* Macro to check if we are in a parallel vacuum. If true, we are in the
* parallel mode and the DSM segment is initialized.
*/
#define ParallelVacuumIsActive(lps) PointerIsValid(lps)
/*
* LVDeadTuples stores the dead tuple TIDs collected during the heap scan.
* This is allocated in the DSM segment in parallel mode and in local memory
* in non-parallel mode.
*/
typedef struct LVDeadTuples
{
int max_tuples; /* # slots allocated in array */
int num_tuples; /* current # of entries */
/* List of TIDs of tuples we intend to delete */
/* NB: this list is ordered by TID address */
ItemPointerData itemptrs[FLEXIBLE_ARRAY_MEMBER]; /* array of
* ItemPointerData */
} LVDeadTuples;
#define SizeOfLVDeadTuples(cnt) \
add_size((offsetof(LVDeadTuples, itemptrs)), \
mul_size(sizeof(ItemPointerData), cnt))
/*
* Shared information among parallel workers. So this is allocated in the DSM
* segment.
*/
typedef struct LVShared
{
/*
* Target table relid and log level. These fields are not modified during
* the lazy vacuum.
*/
Oid relid;
int elevel;
/*
* An indication for vacuum workers to perform either index vacuum or
* index cleanup. first_time is true only if for_cleanup is true and
* bulk-deletion is not performed yet.
*/
bool for_cleanup;
bool first_time;
/*
* Fields for both index vacuum and cleanup.
*
* reltuples is the total number of input heap tuples. We set either old
* live tuples in the index vacuum case or the new live tuples in the
* index cleanup case.
*
* estimated_count is true if the reltuples is an estimated value.
*/
double reltuples;
bool estimated_count;
/*
* In single process lazy vacuum we could consume more memory during index
* vacuuming or cleanup apart from the memory for heap scanning. In
* parallel vacuum, since individual vacuum workers can consume memory
* equal to maintenance_work_mem, the new maintenance_work_mem for each
* worker is set such that the parallel operation doesn't consume more
* memory than single process lazy vacuum.
*/
int maintenance_work_mem_worker;
/*
* Shared vacuum cost balance. During parallel vacuum,
* VacuumSharedCostBalance points to this value and it accumulates the
* balance of each parallel vacuum worker.
*/
pg_atomic_uint32 cost_balance;
/*
* Number of active parallel workers. This is used for computing the
* minimum threshold of the vacuum cost balance for a worker to go for the
* delay.
*/
pg_atomic_uint32 active_nworkers;
/*
* Variables to control parallel vacuum. We have a bitmap to indicate
* which index has stats in shared memory. The set bit in the map
* indicates that the particular index supports a parallel vacuum.
*/
pg_atomic_uint32 idx; /* counter for vacuuming and clean up */
uint32 offset; /* sizeof header incl. bitmap */
bits8 bitmap[FLEXIBLE_ARRAY_MEMBER]; /* bit map of NULLs */
/* Shared index statistics data follows at end of struct */
} LVShared;
#define SizeOfLVShared (offsetof(LVShared, bitmap) + sizeof(bits8))
#define GetSharedIndStats(s) \
((LVSharedIndStats *)((char *)(s) + ((LVShared *)(s))->offset))
#define IndStatsIsNull(s, i) \
(!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07))))
/*
* Struct for an index bulk-deletion statistic used for parallel vacuum. This
* is allocated in the DSM segment.
*/
typedef struct LVSharedIndStats
{
bool updated; /* are the stats updated? */
IndexBulkDeleteResult stats;
} LVSharedIndStats;
/* Struct for maintaining a parallel vacuum state. */
typedef struct LVParallelState
{
ParallelContext *pcxt;
/* Shared information among parallel vacuum workers */
LVShared *lvshared;
/*
* The number of indexes that support parallel index bulk-deletion and
* parallel index cleanup respectively.
*/
int nindexes_parallel_bulkdel;
int nindexes_parallel_cleanup;
int nindexes_parallel_condcleanup;
} LVParallelState;
typedef struct LVRelStats
{
/* useindex = true means two-pass strategy; false means one-pass */
......@@ -128,11 +283,7 @@ typedef struct LVRelStats
BlockNumber pages_removed;
double tuples_deleted;
BlockNumber nonempty_pages; /* actually, last nonempty page + 1 */
/* List of TIDs of tuples we intend to delete */
/* NB: this list is ordered by TID address */
int num_dead_tuples; /* current # of entries */
int max_dead_tuples; /* # slots allocated in array */
ItemPointer dead_tuples; /* array of ItemPointerData */
LVDeadTuples *dead_tuples;
int num_index_scans;
TransactionId latestRemovedXid;
bool lock_waiter_detected;
......@@ -155,15 +306,15 @@ static void lazy_scan_heap(Relation onerel, VacuumParams *params,
bool aggressive);
static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
static bool lazy_check_needs_freeze(Buffer buf, bool *hastup);
static void lazy_vacuum_index(Relation indrel,
IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats);
static void lazy_vacuum_all_indexes(Relation onerel, LVRelStats *vacrelstats,
Relation *Irel, int nindexes,
IndexBulkDeleteResult **indstats);
static void lazy_vacuum_all_indexes(Relation onerel, Relation *Irel,
IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats, LVParallelState *lps,
int nindexes);
static void lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
LVDeadTuples *dead_tuples, double reltuples);
static void lazy_cleanup_index(Relation indrel,
IndexBulkDeleteResult *stats,
LVRelStats *vacrelstats);
IndexBulkDeleteResult **stats,
double reltuples, bool estimated_count);
static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer);
static bool should_attempt_truncation(VacuumParams *params,
......@@ -172,12 +323,41 @@ static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
static BlockNumber count_nondeletable_pages(Relation onerel,
LVRelStats *vacrelstats);
static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
static void lazy_record_dead_tuple(LVDeadTuples *dead_tuples,
ItemPointer itemptr);
static bool lazy_tid_reaped(ItemPointer itemptr, void *state);
static int vac_cmp_itemptr(const void *left, const void *right);
static bool heap_page_is_all_visible(Relation rel, Buffer buf,
TransactionId *visibility_cutoff_xid, bool *all_frozen);
static void lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats, LVParallelState *lps,
int nindexes);
static void parallel_vacuum_index(Relation *Irel, IndexBulkDeleteResult **stats,
LVShared *lvshared, LVDeadTuples *dead_tuples,
int nindexes);
static void vacuum_indexes_leader(Relation *Irel, IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats, LVParallelState *lps,
int nindexes);
static void vacuum_one_index(Relation indrel, IndexBulkDeleteResult **stats,
LVShared *lvshared, LVSharedIndStats *shared_indstats,
LVDeadTuples *dead_tuples);
static void lazy_cleanup_all_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats, LVParallelState *lps,
int nindexes);
static long compute_max_dead_tuples(BlockNumber relblocks, bool hasindex);
static int compute_parallel_vacuum_workers(Relation *Irel, int nindexes, int nrequested,
bool *can_parallel_vacuum);
static void prepare_index_statistics(LVShared *lvshared, bool *can_parallel_vacuum,
int nindexes);
static void update_index_statistics(Relation *Irel, IndexBulkDeleteResult **stats,
int nindexes);
static LVParallelState *begin_parallel_vacuum(Oid relid, Relation *Irel,
LVRelStats *vacrelstats, BlockNumber nblocks,
int nindexes, int nrequested);
static void end_parallel_vacuum(Relation *Irel, IndexBulkDeleteResult **stats,
LVParallelState *lps, int nindexes);
static LVSharedIndStats *get_indstats(LVShared *lvshared, int n);
static bool skip_parallel_vacuum_index(Relation indrel, LVShared *lvshared);
/*
......@@ -491,6 +671,18 @@ vacuum_log_cleanup_info(Relation rel, LVRelStats *vacrelstats)
* dead-tuple TIDs, invoke vacuuming of indexes and call lazy_vacuum_heap
* to reclaim dead line pointers.
*
* If the table has at least two indexes, we execute both index vacuum
* and index cleanup with parallel workers unless the parallel vacuum is
* disabled. In a parallel vacuum, we enter parallel mode and then
* create both the parallel context and the DSM segment before starting
* heap scan so that we can record dead tuples to the DSM segment. All
* parallel workers are launched at beginning of index vacuuming and
* index cleanup and they exit once done with all indexes. At the end of
* this function we exit from parallel mode. Index bulk-deletion results
* are stored in the DSM segment and we update index statistics for all
* the indexes after exiting from parallel mode since writes are not
* allowed during parallel mode.
*
* If there are no indexes then we can reclaim line pointers on the fly;
* dead line pointers need only be retained until all index pointers that
* reference them have been killed.
......@@ -499,6 +691,8 @@ static void
lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
Relation *Irel, int nindexes, bool aggressive)
{
LVParallelState *lps = NULL;
LVDeadTuples *dead_tuples;
BlockNumber nblocks,
blkno;
HeapTupleData tuple;
......@@ -556,13 +750,48 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
vacrelstats->nonempty_pages = 0;
vacrelstats->latestRemovedXid = InvalidTransactionId;
lazy_space_alloc(vacrelstats, nblocks);
/*
* Initialize the state for a parallel vacuum. As of now, only one worker
* can be used for an index, so we invoke parallelism only if there are at
* least two indexes on a table.
*/
if (params->nworkers >= 0 && vacrelstats->useindex && nindexes > 1)
{
/*
* Since parallel workers cannot access data in temporary tables, we
* can't perform parallel vacuum on them.
*/
if (RelationUsesLocalBuffers(onerel))
{
/*
* Give warning only if the user explicitly tries to perform a
* parallel vacuum on the temporary table.
*/
if (params->nworkers > 0)
ereport(WARNING,
(errmsg("disabling parallel option of vacuum on \"%s\" --- cannot vacuum temporary tables in parallel",
RelationGetRelationName(onerel))));
}
else
lps = begin_parallel_vacuum(RelationGetRelid(onerel), Irel,
vacrelstats, nblocks, nindexes,
params->nworkers);
}
/*
* Allocate the space for dead tuples in case the parallel vacuum is not
* initialized.
*/
if (!ParallelVacuumIsActive(lps))
lazy_space_alloc(vacrelstats, nblocks);
dead_tuples = vacrelstats->dead_tuples;
frozen = palloc(sizeof(xl_heap_freeze_tuple) * MaxHeapTuplesPerPage);
/* Report that we're scanning the heap, advertising total # of blocks */
initprog_val[0] = PROGRESS_VACUUM_PHASE_SCAN_HEAP;
initprog_val[1] = nblocks;
initprog_val[2] = vacrelstats->max_dead_tuples;
initprog_val[2] = dead_tuples->max_tuples;
pgstat_progress_update_multi_param(3, initprog_index, initprog_val);
/*
......@@ -740,8 +969,8 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
* If we are close to overrunning the available space for dead-tuple
* TIDs, pause and do a cycle of vacuuming before we tackle this page.
*/
if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MaxHeapTuplesPerPage &&
vacrelstats->num_dead_tuples > 0)
if ((dead_tuples->max_tuples - dead_tuples->num_tuples) < MaxHeapTuplesPerPage &&
dead_tuples->num_tuples > 0)
{
/*
* Before beginning index vacuuming, we release any pin we may
......@@ -756,8 +985,8 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
}
/* Work on all the indexes, then the heap */
lazy_vacuum_all_indexes(onerel, vacrelstats, Irel,
nindexes, indstats);
lazy_vacuum_all_indexes(onerel, Irel, indstats,
vacrelstats, lps, nindexes);
/* Remove tuples from heap */
lazy_vacuum_heap(onerel, vacrelstats);
......@@ -767,7 +996,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
* not to reset latestRemovedXid since we want that value to be
* valid.
*/
vacrelstats->num_dead_tuples = 0;
dead_tuples->num_tuples = 0;
/*
* Vacuum the Free Space Map to make newly-freed space visible on
......@@ -962,7 +1191,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
has_dead_tuples = false;
nfrozen = 0;
hastup = false;
prev_dead_count = vacrelstats->num_dead_tuples;
prev_dead_count = dead_tuples->num_tuples;
maxoff = PageGetMaxOffsetNumber(page);
/*
......@@ -1001,7 +1230,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
*/
if (ItemIdIsDead(itemid))
{
lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
all_visible = false;
continue;
}
......@@ -1147,7 +1376,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
if (tupgone)
{
lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
lazy_record_dead_tuple(dead_tuples, &(tuple.t_self));
HeapTupleHeaderAdvanceLatestRemovedXid(tuple.t_data,
&vacrelstats->latestRemovedXid);
tups_vacuumed += 1;
......@@ -1217,7 +1446,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
* doing a second scan. Also we don't do that but forget dead tuples
* when index cleanup is disabled.
*/
if (!vacrelstats->useindex && vacrelstats->num_dead_tuples > 0)
if (!vacrelstats->useindex && dead_tuples->num_tuples > 0)
{
if (nindexes == 0)
{
......@@ -1246,7 +1475,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
* not to reset latestRemovedXid since we want that value to be
* valid.
*/
vacrelstats->num_dead_tuples = 0;
dead_tuples->num_tuples = 0;
/*
* Periodically do incremental FSM vacuuming to make newly-freed
......@@ -1361,7 +1590,7 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
* page, so remember its free space as-is. (This path will always be
* taken if there are no indexes.)
*/
if (vacrelstats->num_dead_tuples == prev_dead_count)
if (dead_tuples->num_tuples == prev_dead_count)
RecordPageWithFreeSpace(onerel, blkno, freespace);
}
......@@ -1395,11 +1624,11 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
/* If any tuples need to be deleted, perform final vacuum cycle */
/* XXX put a threshold on min number of tuples here? */
if (vacrelstats->num_dead_tuples > 0)
if (dead_tuples->num_tuples > 0)
{
/* Work on all the indexes, and then the heap */
lazy_vacuum_all_indexes(onerel, vacrelstats, Irel, nindexes,
indstats);
lazy_vacuum_all_indexes(onerel, Irel, indstats, vacrelstats,
lps, nindexes);
/* Remove tuples from heap */
lazy_vacuum_heap(onerel, vacrelstats);
......@@ -1412,17 +1641,22 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
if (blkno > next_fsm_block_to_vacuum)
FreeSpaceMapVacuumRange(onerel, next_fsm_block_to_vacuum, blkno);
/* report all blocks vacuumed; and that we're cleaning up */
/* report all blocks vacuumed */
pgstat_progress_update_param(PROGRESS_VACUUM_HEAP_BLKS_VACUUMED, blkno);
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
/* Do post-vacuum cleanup and statistics update for each index */
/* Do post-vacuum cleanup */
if (vacrelstats->useindex)
{
for (i = 0; i < nindexes; i++)
lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
}
lazy_cleanup_all_indexes(Irel, indstats, vacrelstats, lps, nindexes);
/*
* End parallel mode before updating index statistics as we cannot write
* during parallel mode.
*/
if (ParallelVacuumIsActive(lps))
end_parallel_vacuum(Irel, indstats, lps, nindexes);
/* Update index statistics */
update_index_statistics(Irel, indstats, nindexes);
/* If no indexes, make log report that lazy_vacuum_heap would've made */
if (vacuumed_pages)
......@@ -1467,15 +1701,16 @@ lazy_scan_heap(Relation onerel, VacuumParams *params, LVRelStats *vacrelstats,
/*
* lazy_vacuum_all_indexes() -- vacuum all indexes of relation.
*
* This is a utility wrapper for lazy_vacuum_index(), able to do
* progress reporting.
* We process the indexes serially unless we are doing parallel vacuum.
*/
static void
lazy_vacuum_all_indexes(Relation onerel, LVRelStats *vacrelstats,
Relation *Irel, int nindexes,
IndexBulkDeleteResult **indstats)
lazy_vacuum_all_indexes(Relation onerel, Relation *Irel,
IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats, LVParallelState *lps,
int nindexes)
{
int i;
Assert(!IsParallelWorker());
Assert(nindexes > 0);
/* Log cleanup info before we touch indexes */
vacuum_log_cleanup_info(onerel, vacrelstats);
......@@ -1484,9 +1719,30 @@ lazy_vacuum_all_indexes(Relation onerel, LVRelStats *vacrelstats,
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
/* Remove index entries */
for (i = 0; i < nindexes; i++)
lazy_vacuum_index(Irel[i], &indstats[i], vacrelstats);
/* Perform index vacuuming with parallel workers for parallel vacuum. */
if (ParallelVacuumIsActive(lps))
{
/* Tell parallel workers to do index vacuuming */
lps->lvshared->for_cleanup = false;
lps->lvshared->first_time = false;
/*
* We can only provide an approximate value of num_heap_tuples in
* vacuum cases.
*/
lps->lvshared->reltuples = vacrelstats->old_live_tuples;
lps->lvshared->estimated_count = true;
lazy_parallel_vacuum_indexes(Irel, stats, vacrelstats, lps, nindexes);
}
else
{
int idx;
for (idx = 0; idx < nindexes; idx++)
lazy_vacuum_index(Irel[idx], &stats[idx], vacrelstats->dead_tuples,
vacrelstats->old_live_tuples);
}
/* Increase and report the number of index scans */
vacrelstats->num_index_scans++;
......@@ -1522,7 +1778,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
npages = 0;
tupindex = 0;
while (tupindex < vacrelstats->num_dead_tuples)
while (tupindex < vacrelstats->dead_tuples->num_tuples)
{
BlockNumber tblk;
Buffer buf;
......@@ -1531,7 +1787,7 @@ lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
vacuum_delay_point();
tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples->itemptrs[tupindex]);
buf = ReadBufferExtended(onerel, MAIN_FORKNUM, tblk, RBM_NORMAL,
vac_strategy);
if (!ConditionalLockBufferForCleanup(buf))
......@@ -1579,6 +1835,7 @@ static int
lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer)
{
LVDeadTuples *dead_tuples = vacrelstats->dead_tuples;
Page page = BufferGetPage(buffer);
OffsetNumber unused[MaxOffsetNumber];
int uncnt = 0;
......@@ -1589,16 +1846,16 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
START_CRIT_SECTION();
for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
for (; tupindex < dead_tuples->num_tuples; tupindex++)
{
BlockNumber tblk;
OffsetNumber toff;
ItemId itemid;
tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
tblk = ItemPointerGetBlockNumber(&dead_tuples->itemptrs[tupindex]);
if (tblk != blkno)
break; /* past end of tuples for this block */
toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
toff = ItemPointerGetOffsetNumber(&dead_tuples->itemptrs[tupindex]);
itemid = PageGetItemId(page, toff);
ItemIdSetUnused(itemid);
unused[uncnt++] = toff;
......@@ -1719,19 +1976,344 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup)
return false;
}
/*
* Perform index vacuum or index cleanup with parallel workers. This function
* must be used by the parallel vacuum leader process. The caller must set
* lps->lvshared->for_cleanup to indicate whether to perform vacuum or
* cleanup.
*/
static void
lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats, LVParallelState *lps,
int nindexes)
{
int nworkers;
Assert(!IsParallelWorker());
Assert(ParallelVacuumIsActive(lps));
Assert(nindexes > 0);
/* Determine the number of parallel workers to launch */
if (lps->lvshared->for_cleanup)
{
if (lps->lvshared->first_time)
nworkers = lps->nindexes_parallel_cleanup +
lps->nindexes_parallel_condcleanup;
else
nworkers = lps->nindexes_parallel_cleanup;
}
else
nworkers = lps->nindexes_parallel_bulkdel;
/* The leader process will participate */
nworkers--;
/*
* It is possible that parallel context is initialized with fewer workers
* than the number of indexes that need a separate worker in the current
* phase, so we need to consider it. See compute_parallel_vacuum_workers.
*/
nworkers = Min(nworkers, lps->pcxt->nworkers);
/* Setup the shared cost-based vacuum delay and launch workers */
if (nworkers > 0)
{
if (vacrelstats->num_index_scans > 0)
{
/* Reset the parallel index processing counter */
pg_atomic_write_u32(&(lps->lvshared->idx), 0);
/* Reinitialize the parallel context to relaunch parallel workers */
ReinitializeParallelDSM(lps->pcxt);
}
/*
* Set up shared cost balance and the number of active workers for
* vacuum delay. We need to do this before launching workers as
* otherwise, they might not see the updated values for these
* parameters.
*/
pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance);
pg_atomic_write_u32(&(lps->lvshared->active_nworkers), 0);
/*
* The number of workers can vary between bulkdelete and cleanup
* phase.
*/
ReinitializeParallelWorkers(lps->pcxt, nworkers);
LaunchParallelWorkers(lps->pcxt);
if (lps->pcxt->nworkers_launched > 0)
{
/*
* Reset the local cost values for leader backend as we have
* already accumulated the remaining balance of heap.
*/
VacuumCostBalance = 0;
VacuumCostBalanceLocal = 0;
/* Enable shared cost balance for leader backend */
VacuumSharedCostBalance = &(lps->lvshared->cost_balance);
VacuumActiveNWorkers = &(lps->lvshared->active_nworkers);
}
if (lps->lvshared->for_cleanup)
ereport(elevel,
(errmsg(ngettext("launched %d parallel vacuum worker for index cleanup (planned: %d)",
"launched %d parallel vacuum workers for index cleanup (planned: %d)",
lps->pcxt->nworkers_launched),
lps->pcxt->nworkers_launched, nworkers)));
else
ereport(elevel,
(errmsg(ngettext("launched %d parallel vacuum worker for index vacuuming (planned: %d)",
"launched %d parallel vacuum workers for index vacuuming (planned: %d)",
lps->pcxt->nworkers_launched),
lps->pcxt->nworkers_launched, nworkers)));
}
/* Process the indexes that can be processed by only leader process */
vacuum_indexes_leader(Irel, stats, vacrelstats, lps, nindexes);
/*
* Join as a parallel worker. The leader process alone processes all the
* indexes in the case where no workers are launched.
*/
parallel_vacuum_index(Irel, stats, lps->lvshared,
vacrelstats->dead_tuples, nindexes);
/* Wait for all vacuum workers to finish */
WaitForParallelWorkersToFinish(lps->pcxt);
/*
* Carry the shared balance value to heap scan and disable shared costing
*/
if (VacuumSharedCostBalance)
{
VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance);
VacuumSharedCostBalance = NULL;
VacuumActiveNWorkers = NULL;
}
}
/*
* Index vacuum/cleanup routine used by the leader process and parallel
* vacuum worker processes to process the indexes in parallel.
*/
static void
parallel_vacuum_index(Relation *Irel, IndexBulkDeleteResult **stats,
LVShared *lvshared, LVDeadTuples *dead_tuples,
int nindexes)
{
/*
* Increment the active worker count if we are able to launch any worker.
*/
if (VacuumActiveNWorkers)
pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
/* Loop until all indexes are vacuumed */
for (;;)
{
int idx;
LVSharedIndStats *shared_indstats;
/* Get an index number to process */
idx = pg_atomic_fetch_add_u32(&(lvshared->idx), 1);
/* Done for all indexes? */
if (idx >= nindexes)
break;
/* Get the index statistics of this index from DSM */
shared_indstats = get_indstats(lvshared, idx);
/*
* Skip processing indexes that doesn't participate in parallel
* operation
*/
if (shared_indstats == NULL ||
skip_parallel_vacuum_index(Irel[idx], lvshared))
continue;
/* Do vacuum or cleanup of the index */
vacuum_one_index(Irel[idx], &(stats[idx]), lvshared, shared_indstats,
dead_tuples);
}
/*
* We have completed the index vacuum so decrement the active worker
* count.
*/
if (VacuumActiveNWorkers)
pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
}
/*
* Vacuum or cleanup indexes that can be processed by only the leader process
* because these indexes don't support parallel operation at that phase.
*/
static void
vacuum_indexes_leader(Relation *Irel, IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats, LVParallelState *lps,
int nindexes)
{
int i;
Assert(!IsParallelWorker());
/*
* Increment the active worker count if we are able to launch any worker.
*/
if (VacuumActiveNWorkers)
pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1);
for (i = 0; i < nindexes; i++)
{
LVSharedIndStats *shared_indstats;
shared_indstats = get_indstats(lps->lvshared, i);
/* Process the indexes skipped by parallel workers */
if (shared_indstats == NULL ||
skip_parallel_vacuum_index(Irel[i], lps->lvshared))
vacuum_one_index(Irel[i], &(stats[i]), lps->lvshared,
shared_indstats, vacrelstats->dead_tuples);
}
/*
* We have completed the index vacuum so decrement the active worker
* count.
*/
if (VacuumActiveNWorkers)
pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1);
}
/*
* Vacuum or cleanup index either by leader process or by one of the worker
* process. After processing the index this function copies the index
* statistics returned from ambulkdelete and amvacuumcleanup to the DSM
* segment.
*/
static void
vacuum_one_index(Relation indrel, IndexBulkDeleteResult **stats,
LVShared *lvshared, LVSharedIndStats *shared_indstats,
LVDeadTuples *dead_tuples)
{
IndexBulkDeleteResult *bulkdelete_res = NULL;
if (shared_indstats)
{
/* Get the space for IndexBulkDeleteResult */
bulkdelete_res = &(shared_indstats->stats);
/*
* Update the pointer to the corresponding bulk-deletion result if
* someone has already updated it.
*/
if (shared_indstats->updated && *stats == NULL)
*stats = bulkdelete_res;
}
/* Do vacuum or cleanup of the index */
if (lvshared->for_cleanup)
lazy_cleanup_index(indrel, stats, lvshared->reltuples,
lvshared->estimated_count);
else
lazy_vacuum_index(indrel, stats, dead_tuples,
lvshared->reltuples);
/*
* Copy the index bulk-deletion result returned from ambulkdelete and
* amvacuumcleanup to the DSM segment if it's the first time to get it
* from them, because they allocate it locally and it's possible that an
* index will be vacuumed by the different vacuum process at the next
* time. The copying of the result normally happens only after the first
* time of index vacuuming. From the second time, we pass the result on
* the DSM segment so that they then update it directly.
*
* Since all vacuum workers write the bulk-deletion result at different
* slots we can write them without locking.
*/
if (shared_indstats && !shared_indstats->updated && *stats != NULL)
{
memcpy(bulkdelete_res, *stats, sizeof(IndexBulkDeleteResult));
shared_indstats->updated = true;
/*
* Now that the stats[idx] points to the DSM segment, we don't need
* the locally allocated results.
*/
pfree(*stats);
*stats = bulkdelete_res;
}
}
/*
* lazy_cleanup_all_indexes() -- cleanup all indexes of relation.
*
* Cleanup indexes. We process the indexes serially unless we are doing
* parallel vacuum.
*/
static void
lazy_cleanup_all_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats, LVParallelState *lps,
int nindexes)
{
int idx;
Assert(!IsParallelWorker());
Assert(nindexes > 0);
/* Report that we are now cleaning up indexes */
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
/*
* If parallel vacuum is active we perform index cleanup with parallel
* workers.
*/
if (ParallelVacuumIsActive(lps))
{
/* Tell parallel workers to do index cleanup */
lps->lvshared->for_cleanup = true;
lps->lvshared->first_time =
(vacrelstats->num_index_scans == 0);
/*
* Now we can provide a better estimate of total number of surviving
* tuples (we assume indexes are more interested in that than in the
* number of nominally live tuples).
*/
lps->lvshared->reltuples = vacrelstats->new_rel_tuples;
lps->lvshared->estimated_count =
(vacrelstats->tupcount_pages < vacrelstats->rel_pages);
lazy_parallel_vacuum_indexes(Irel, stats, vacrelstats, lps, nindexes);
}
else
{
for (idx = 0; idx < nindexes; idx++)
lazy_cleanup_index(Irel[idx], &stats[idx],
vacrelstats->new_rel_tuples,
vacrelstats->tupcount_pages < vacrelstats->rel_pages);
}
}
/*
* lazy_vacuum_index() -- vacuum one index relation.
*
* Delete all the index entries pointing to tuples listed in
* vacrelstats->dead_tuples, and update running statistics.
* dead_tuples, and update running statistics.
*
* reltuples is the number of heap tuples to be passed to the
* bulkdelete callback.
*/
static void
lazy_vacuum_index(Relation indrel,
IndexBulkDeleteResult **stats,
LVRelStats *vacrelstats)
lazy_vacuum_index(Relation indrel, IndexBulkDeleteResult **stats,
LVDeadTuples *dead_tuples, double reltuples)
{
IndexVacuumInfo ivinfo;
const char *msg;
PGRUsage ru0;
pg_rusage_init(&ru0);
......@@ -1741,30 +2323,38 @@ lazy_vacuum_index(Relation indrel,
ivinfo.report_progress = false;
ivinfo.estimated_count = true;
ivinfo.message_level = elevel;
/* We can only provide an approximate value of num_heap_tuples here */
ivinfo.num_heap_tuples = vacrelstats->old_live_tuples;
ivinfo.num_heap_tuples = reltuples;
ivinfo.strategy = vac_strategy;
/* Do bulk deletion */
*stats = index_bulk_delete(&ivinfo, *stats,
lazy_tid_reaped, (void *) vacrelstats);
lazy_tid_reaped, (void *) dead_tuples);
if (IsParallelWorker())
msg = gettext_noop("scanned index \"%s\" to remove %d row versions by parallel vacuum worker");
else
msg = gettext_noop("scanned index \"%s\" to remove %d row versions");
ereport(elevel,
(errmsg("scanned index \"%s\" to remove %d row versions",
(errmsg(msg,
RelationGetRelationName(indrel),
vacrelstats->num_dead_tuples),
dead_tuples->num_tuples),
errdetail_internal("%s", pg_rusage_show(&ru0))));
}
/*
* lazy_cleanup_index() -- do post-vacuum cleanup for one index relation.
*
* reltuples is the number of heap tuples and estimated_count is true
* if the reltuples is an estimated value.
*/
static void
lazy_cleanup_index(Relation indrel,
IndexBulkDeleteResult *stats,
LVRelStats *vacrelstats)
IndexBulkDeleteResult **stats,
double reltuples, bool estimated_count)
{
IndexVacuumInfo ivinfo;
const char *msg;
PGRUsage ru0;
pg_rusage_init(&ru0);
......@@ -1772,49 +2362,33 @@ lazy_cleanup_index(Relation indrel,
ivinfo.index = indrel;
ivinfo.analyze_only = false;
ivinfo.report_progress = false;
ivinfo.estimated_count = (vacrelstats->tupcount_pages < vacrelstats->rel_pages);
ivinfo.estimated_count = estimated_count;
ivinfo.message_level = elevel;
/*
* Now we can provide a better estimate of total number of surviving
* tuples (we assume indexes are more interested in that than in the
* number of nominally live tuples).
*/
ivinfo.num_heap_tuples = vacrelstats->new_rel_tuples;
ivinfo.num_heap_tuples = reltuples;
ivinfo.strategy = vac_strategy;
stats = index_vacuum_cleanup(&ivinfo, stats);
*stats = index_vacuum_cleanup(&ivinfo, *stats);
if (!stats)
if (!(*stats))
return;
/*
* Now update statistics in pg_class, but only if the index says the count
* is accurate.
*/
if (!stats->estimated_count)
vac_update_relstats(indrel,
stats->num_pages,
stats->num_index_tuples,
0,
false,
InvalidTransactionId,
InvalidMultiXactId,
false);
if (IsParallelWorker())
msg = gettext_noop("index \"%s\" now contains %.0f row versions in %u pages as reported by parallel vacuum worker");
else
msg = gettext_noop("index \"%s\" now contains %.0f row versions in %u pages");
ereport(elevel,
(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
(errmsg(msg,
RelationGetRelationName(indrel),
stats->num_index_tuples,
stats->num_pages),
(*stats)->num_index_tuples,
(*stats)->num_pages),
errdetail("%.0f index row versions were removed.\n"
"%u index pages have been deleted, %u are currently reusable.\n"
"%s.",
stats->tuples_removed,
stats->pages_deleted, stats->pages_free,
(*stats)->tuples_removed,
(*stats)->pages_deleted, (*stats)->pages_free,
pg_rusage_show(&ru0))));
pfree(stats);
}
/*
......@@ -2122,19 +2696,17 @@ count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
}
/*
* lazy_space_alloc - space allocation decisions for lazy vacuum
*
* See the comments at the head of this file for rationale.
* Return the maximum number of dead tuples we can record.
*/
static void
lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
static long
compute_max_dead_tuples(BlockNumber relblocks, bool useindex)
{
long maxtuples;
int vac_work_mem = IsAutoVacuumWorkerProcess() &&
autovacuum_work_mem != -1 ?
autovacuum_work_mem : maintenance_work_mem;
if (vacrelstats->useindex)
if (useindex)
{
maxtuples = (vac_work_mem * 1024L) / sizeof(ItemPointerData);
maxtuples = Min(maxtuples, INT_MAX);
......@@ -2148,34 +2720,48 @@ lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
maxtuples = Max(maxtuples, MaxHeapTuplesPerPage);
}
else
{
maxtuples = MaxHeapTuplesPerPage;
}
vacrelstats->num_dead_tuples = 0;
vacrelstats->max_dead_tuples = (int) maxtuples;
vacrelstats->dead_tuples = (ItemPointer)
palloc(maxtuples * sizeof(ItemPointerData));
return maxtuples;
}
/*
* lazy_space_alloc - space allocation decisions for lazy vacuum
*
* See the comments at the head of this file for rationale.
*/
static void
lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
{
LVDeadTuples *dead_tuples = NULL;
long maxtuples;
maxtuples = compute_max_dead_tuples(relblocks, vacrelstats->useindex);
dead_tuples = (LVDeadTuples *) palloc(SizeOfLVDeadTuples(maxtuples));
dead_tuples->num_tuples = 0;
dead_tuples->max_tuples = (int) maxtuples;
vacrelstats->dead_tuples = dead_tuples;
}
/*
* lazy_record_dead_tuple - remember one deletable tuple
*/
static void
lazy_record_dead_tuple(LVRelStats *vacrelstats,
ItemPointer itemptr)
lazy_record_dead_tuple(LVDeadTuples *dead_tuples, ItemPointer itemptr)
{
/*
* The array shouldn't overflow under normal behavior, but perhaps it
* could if we are given a really small maintenance_work_mem. In that
* case, just forget the last few tuples (we'll get 'em next time).
*/
if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
if (dead_tuples->num_tuples < dead_tuples->max_tuples)
{
vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
vacrelstats->num_dead_tuples++;
dead_tuples->itemptrs[dead_tuples->num_tuples] = *itemptr;
dead_tuples->num_tuples++;
pgstat_progress_update_param(PROGRESS_VACUUM_NUM_DEAD_TUPLES,
vacrelstats->num_dead_tuples);
dead_tuples->num_tuples);
}
}
......@@ -2189,12 +2775,12 @@ lazy_record_dead_tuple(LVRelStats *vacrelstats,
static bool
lazy_tid_reaped(ItemPointer itemptr, void *state)
{
LVRelStats *vacrelstats = (LVRelStats *) state;
LVDeadTuples *dead_tuples = (LVDeadTuples *) state;
ItemPointer res;
res = (ItemPointer) bsearch((void *) itemptr,
(void *) vacrelstats->dead_tuples,
vacrelstats->num_dead_tuples,
(void *) dead_tuples->itemptrs,
dead_tuples->num_tuples,
sizeof(ItemPointerData),
vac_cmp_itemptr);
......@@ -2342,3 +2928,447 @@ heap_page_is_all_visible(Relation rel, Buffer buf,
return all_visible;
}
/*
* Compute the number of parallel worker processes to request. Both index
* vacuum and index cleanup can be executed with parallel workers. The index
* is eligible for parallel vacuum iff it's size is greater than
* min_parallel_index_scan_size as invoking workers for very small indexes
* can hurt the performance.
*
* nrequested is the number of parallel workers that user requested. If
* nrequested is 0, we compute the parallel degree based on nindexes, that is
* the number of indexes that support parallel vacuum. This function also
* sets can_parallel_vacuum to remember indexes that participate in parallel
* vacuum.
*/
static int
compute_parallel_vacuum_workers(Relation *Irel, int nindexes, int nrequested,
bool *can_parallel_vacuum)
{
int nindexes_parallel = 0;
int nindexes_parallel_bulkdel = 0;
int nindexes_parallel_cleanup = 0;
int parallel_workers;
int i;
/*
* We don't allow to perform parallel operation in standalone backend or
* when parallelism is disabled.
*/
if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0)
return 0;
/*
* Compute the number of indexes that can participate in parallel vacuum.
*/
for (i = 0; i < nindexes; i++)
{
uint8 vacoptions = Irel[i]->rd_indam->amparallelvacuumoptions;
if (vacoptions == VACUUM_OPTION_NO_PARALLEL ||
RelationGetNumberOfBlocks(Irel[i]) < min_parallel_index_scan_size)
continue;
can_parallel_vacuum[i] = true;
if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
nindexes_parallel_bulkdel++;
if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0) ||
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
nindexes_parallel_cleanup++;
}
nindexes_parallel = Max(nindexes_parallel_bulkdel,
nindexes_parallel_cleanup);
/* The leader process takes one index */
nindexes_parallel--;
/* No index supports parallel vacuum */
if (nindexes_parallel <= 0)
return 0;
/* Compute the parallel degree */
parallel_workers = (nrequested > 0) ?
Min(nrequested, nindexes_parallel) : nindexes_parallel;
/* Cap by max_parallel_maintenance_workers */
parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers);
return parallel_workers;
}
/*
* Initialize variables for shared index statistics, set NULL bitmap and the
* size of stats for each index.
*/
static void
prepare_index_statistics(LVShared *lvshared, bool *can_parallel_vacuum,
int nindexes)
{
int i;
/* Currently, we don't support parallel vacuum for autovacuum */
Assert(!IsAutoVacuumWorkerProcess());
/* Set NULL for all indexes */
memset(lvshared->bitmap, 0x00, BITMAPLEN(nindexes));
for (i = 0; i < nindexes; i++)
{
if (!can_parallel_vacuum[i])
continue;
/* Set NOT NULL as this index do support parallelism */
lvshared->bitmap[i >> 3] |= 1 << (i & 0x07);
}
}
/*
* Update index statistics in pg_class if the statistics is accurate.
*/
static void
update_index_statistics(Relation *Irel, IndexBulkDeleteResult **stats,
int nindexes)
{
int i;
Assert(!IsInParallelMode());
for (i = 0; i < nindexes; i++)
{
if (stats[i] == NULL || stats[i]->estimated_count)
continue;
/* Update index statistics */
vac_update_relstats(Irel[i],
stats[i]->num_pages,
stats[i]->num_index_tuples,
0,
false,
InvalidTransactionId,
InvalidMultiXactId,
false);
pfree(stats[i]);
}
}
/*
* This function prepares and returns parallel vacuum state if we can launch
* even one worker. This function is responsible to enter parallel mode,
* create a parallel context, and then initialize the DSM segment.
*/
static LVParallelState *
begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
BlockNumber nblocks, int nindexes, int nrequested)
{
LVParallelState *lps = NULL;
ParallelContext *pcxt;
LVShared *shared;
LVDeadTuples *dead_tuples;
bool *can_parallel_vacuum;
long maxtuples;
char *sharedquery;
Size est_shared;
Size est_deadtuples;
int nindexes_mwm = 0;
int parallel_workers = 0;
int querylen;
int i;
/*
* A parallel vacuum must be requested and there must be indexes on the
* relation
*/
Assert(nrequested >= 0);
Assert(nindexes > 0);
/*
* Compute the number of parallel vacuum workers to launch
*/
can_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes);
parallel_workers = compute_parallel_vacuum_workers(Irel, nindexes,
nrequested,
can_parallel_vacuum);
/* Can't perform vacuum in parallel */
if (parallel_workers <= 0)
{
pfree(can_parallel_vacuum);
return lps;
}
lps = (LVParallelState *) palloc0(sizeof(LVParallelState));
EnterParallelMode();
pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
parallel_workers);
Assert(pcxt->nworkers > 0);
lps->pcxt = pcxt;
/* Estimate size for shared information -- PARALLEL_VACUUM_KEY_SHARED */
est_shared = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
for (i = 0; i < nindexes; i++)
{
uint8 vacoptions = Irel[i]->rd_indam->amparallelvacuumoptions;
/*
* Cleanup option should be either disabled, always performing in
* parallel or conditionally performing in parallel.
*/
Assert(((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) ||
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0));
Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE);
/* Skip indexes that don't participate in parallel vacuum */
if (!can_parallel_vacuum[i])
continue;
if (Irel[i]->rd_indam->amusemaintenanceworkmem)
nindexes_mwm++;
est_shared = add_size(est_shared, sizeof(LVSharedIndStats));
/*
* Remember the number of indexes that support parallel operation for
* each phase.
*/
if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0)
lps->nindexes_parallel_bulkdel++;
if ((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) != 0)
lps->nindexes_parallel_cleanup++;
if ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0)
lps->nindexes_parallel_condcleanup++;
}
shm_toc_estimate_chunk(&pcxt->estimator, est_shared);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate size for dead tuples -- PARALLEL_VACUUM_KEY_DEAD_TUPLES */
maxtuples = compute_max_dead_tuples(nblocks, true);
est_deadtuples = MAXALIGN(SizeOfLVDeadTuples(maxtuples));
shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
querylen = strlen(debug_query_string);
shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
shm_toc_estimate_keys(&pcxt->estimator, 1);
InitializeParallelDSM(pcxt);
/* Prepare shared information */
shared = (LVShared *) shm_toc_allocate(pcxt->toc, est_shared);
MemSet(shared, 0, est_shared);
shared->relid = relid;
shared->elevel = elevel;
shared->maintenance_work_mem_worker =
(nindexes_mwm > 0) ?
maintenance_work_mem / Min(parallel_workers, nindexes_mwm) :
maintenance_work_mem;
pg_atomic_init_u32(&(shared->cost_balance), 0);
pg_atomic_init_u32(&(shared->active_nworkers), 0);
pg_atomic_init_u32(&(shared->idx), 0);
shared->offset = MAXALIGN(add_size(SizeOfLVShared, BITMAPLEN(nindexes)));
prepare_index_statistics(shared, can_parallel_vacuum, nindexes);
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
lps->lvshared = shared;
/* Prepare the dead tuple space */
dead_tuples = (LVDeadTuples *) shm_toc_allocate(pcxt->toc, est_deadtuples);
dead_tuples->max_tuples = maxtuples;
dead_tuples->num_tuples = 0;
MemSet(dead_tuples->itemptrs, 0, sizeof(ItemPointerData) * maxtuples);
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
vacrelstats->dead_tuples = dead_tuples;
/* Store query string for workers */
sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
memcpy(sharedquery, debug_query_string, querylen + 1);
sharedquery[querylen] = '\0';
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery);
pfree(can_parallel_vacuum);
return lps;
}
/*
* Destroy the parallel context, and end parallel mode.
*
* Since writes are not allowed during the parallel mode, so we copy the
* updated index statistics from DSM in local memory and then later use that
* to update the index statistics. One might think that we can exit from
* parallel mode, update the index statistics and then destroy parallel
* context, but that won't be safe (see ExitParallelMode).
*/
static void
end_parallel_vacuum(Relation *Irel, IndexBulkDeleteResult **stats,
LVParallelState *lps, int nindexes)
{
int i;
Assert(!IsParallelWorker());
/* Copy the updated statistics */
for (i = 0; i < nindexes; i++)
{
LVSharedIndStats *indstats = get_indstats(lps->lvshared, i);
/*
* Skip unused slot. The statistics of this index are already stored
* in local memory.
*/
if (indstats == NULL)
continue;
if (indstats->updated)
{
stats[i] = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
memcpy(stats[i], &(indstats->stats), sizeof(IndexBulkDeleteResult));
}
else
stats[i] = NULL;
}
DestroyParallelContext(lps->pcxt);
ExitParallelMode();
/* Deactivate parallel vacuum */
pfree(lps);
lps = NULL;
}
/* Return the Nth index statistics or NULL */
static LVSharedIndStats *
get_indstats(LVShared *lvshared, int n)
{
int i;
char *p;
if (IndStatsIsNull(lvshared, n))
return NULL;
p = (char *) GetSharedIndStats(lvshared);
for (i = 0; i < n; i++)
{
if (IndStatsIsNull(lvshared, i))
continue;
p += sizeof(LVSharedIndStats);
}
return (LVSharedIndStats *) p;
}
/*
* Returns true, if the given index can't participate in parallel index vacuum
* or parallel index cleanup, false, otherwise.
*/
static bool
skip_parallel_vacuum_index(Relation indrel, LVShared *lvshared)
{
uint8 vacoptions = indrel->rd_indam->amparallelvacuumoptions;
/* first_time must be true only if for_cleanup is true */
Assert(lvshared->for_cleanup || !lvshared->first_time);
if (lvshared->for_cleanup)
{
/* Skip, if the index does not support parallel cleanup */
if (((vacoptions & VACUUM_OPTION_PARALLEL_CLEANUP) == 0) &&
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0))
return true;
/*
* Skip, if the index supports parallel cleanup conditionally, but we
* have already processed the index (for bulkdelete). See the
* comments for option VACUUM_OPTION_PARALLEL_COND_CLEANUP to know
* when indexes support parallel cleanup conditionally.
*/
if (!lvshared->first_time &&
((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) != 0))
return true;
}
else if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) == 0)
{
/* Skip if the index does not support parallel bulk deletion */
return true;
}
return false;
}
/*
* Perform work within a launched parallel process.
*
* Since parallel vacuum workers perform only index vacuum or index cleanup,
* we don't need to report the progress information.
*/
void
parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
{
Relation onerel;
Relation *indrels;
LVShared *lvshared;
LVDeadTuples *dead_tuples;
int nindexes;
char *sharedquery;
IndexBulkDeleteResult **stats;
lvshared = (LVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED,
false);
elevel = lvshared->elevel;
ereport(DEBUG1,
(errmsg("starting parallel vacuum worker for %s",
lvshared->for_cleanup ? "cleanup" : "bulk delete")));
/* Set debug_query_string for individual workers */
sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, false);
debug_query_string = sharedquery;
pgstat_report_activity(STATE_RUNNING, debug_query_string);
/*
* Open table. The lock mode is the same as the leader process. It's
* okay because the lock mode does not conflict among the parallel
* workers.
*/
onerel = table_open(lvshared->relid, ShareUpdateExclusiveLock);
/*
* Open all indexes. indrels are sorted in order by OID, which should be
* matched to the leader's one.
*/
vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &indrels);
Assert(nindexes > 0);
/* Set dead tuple space */
dead_tuples = (LVDeadTuples *) shm_toc_lookup(toc,
PARALLEL_VACUUM_KEY_DEAD_TUPLES,
false);
/* Set cost-based vacuum delay */
VacuumCostActive = (VacuumCostDelay > 0);
VacuumCostBalance = 0;
VacuumPageHit = 0;
VacuumPageMiss = 0;
VacuumPageDirty = 0;
VacuumCostBalanceLocal = 0;
VacuumSharedCostBalance = &(lvshared->cost_balance);
VacuumActiveNWorkers = &(lvshared->active_nworkers);
stats = (IndexBulkDeleteResult **)
palloc0(nindexes * sizeof(IndexBulkDeleteResult *));
if (lvshared->maintenance_work_mem_worker > 0)
maintenance_work_mem = lvshared->maintenance_work_mem_worker;
/* Process indexes to perform vacuum/cleanup */
parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes);
vac_close_indexes(nindexes, indrels, RowExclusiveLock);
table_close(onerel, ShareUpdateExclusiveLock);
pfree(stats);
}
......@@ -14,6 +14,7 @@
#include "postgres.h"
#include "access/heapam.h"
#include "access/nbtree.h"
#include "access/parallel.h"
#include "access/session.h"
......@@ -139,6 +140,9 @@ static const struct
},
{
"_bt_parallel_build_main", _bt_parallel_build_main
},
{
"parallel_vacuum_main", parallel_vacuum_main
}
};
......@@ -174,6 +178,7 @@ CreateParallelContext(const char *library_name, const char *function_name,
pcxt = palloc0(sizeof(ParallelContext));
pcxt->subid = GetCurrentSubTransactionId();
pcxt->nworkers = nworkers;
pcxt->nworkers_to_launch = nworkers;
pcxt->library_name = pstrdup(library_name);
pcxt->function_name = pstrdup(function_name);
pcxt->error_context_stack = error_context_stack;
......@@ -486,6 +491,23 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
}
}
/*
* Reinitialize parallel workers for a parallel context such that we could
* launch the different number of workers. This is required for cases where
* we need to reuse the same DSM segment, but the number of workers can
* vary from run-to-run.
*/
void
ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
{
/*
* The number of workers that need to be launched must be less than the
* number of workers with which the parallel context is initialized.
*/
Assert(pcxt->nworkers >= nworkers_to_launch);
pcxt->nworkers_to_launch = nworkers_to_launch;
}
/*
* Launch parallel workers.
*/
......@@ -498,7 +520,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
bool any_registrations_failed = false;
/* Skip this if we have no workers. */
if (pcxt->nworkers == 0)
if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
return;
/* We need to be a lock group leader. */
......@@ -533,7 +555,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
* fails. It wouldn't help much anyway, because registering the worker in
* no way guarantees that it will start up and initialize successfully.
*/
for (i = 0; i < pcxt->nworkers; ++i)
for (i = 0; i < pcxt->nworkers_to_launch; ++i)
{
memcpy(worker.bgw_extra, &i, sizeof(int));
if (!any_registrations_failed &&
......
......@@ -42,6 +42,7 @@
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "postmaster/bgworker_internals.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
......@@ -68,6 +69,14 @@ static MemoryContext vac_context = NULL;
static BufferAccessStrategy vac_strategy;
/*
* Variables for cost-based parallel vacuum. See comments atop
* compute_parallel_delay to understand how it works.
*/
pg_atomic_uint32 *VacuumSharedCostBalance = NULL;
pg_atomic_uint32 *VacuumActiveNWorkers = NULL;
int VacuumCostBalanceLocal = 0;
/* non-export function prototypes */
static List *expand_vacuum_rel(VacuumRelation *vrel, int options);
static List *get_all_vacuum_rels(int options);
......@@ -76,6 +85,7 @@ static void vac_truncate_clog(TransactionId frozenXID,
TransactionId lastSaneFrozenXid,
MultiXactId lastSaneMinMulti);
static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params);
static double compute_parallel_delay(void);
static VacOptTernaryValue get_vacopt_ternary_value(DefElem *def);
/*
......@@ -94,12 +104,16 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
bool freeze = false;
bool full = false;
bool disable_page_skipping = false;
bool parallel_option = false;
ListCell *lc;
/* Set default value */
params.index_cleanup = VACOPT_TERNARY_DEFAULT;
params.truncate = VACOPT_TERNARY_DEFAULT;
/* By default parallel vacuum is enabled */
params.nworkers = 0;
/* Parse options list */
foreach(lc, vacstmt->options)
{
......@@ -129,6 +143,39 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
params.index_cleanup = get_vacopt_ternary_value(opt);
else if (strcmp(opt->defname, "truncate") == 0)
params.truncate = get_vacopt_ternary_value(opt);
else if (strcmp(opt->defname, "parallel") == 0)
{
parallel_option = true;
if (opt->arg == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("parallel option requires a value between 0 and %d",
MAX_PARALLEL_WORKER_LIMIT),
parser_errposition(pstate, opt->location)));
}
else
{
int nworkers;
nworkers = defGetInt32(opt);
if (nworkers < 0 || nworkers > MAX_PARALLEL_WORKER_LIMIT)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("parallel vacuum degree must be between 0 and %d",
MAX_PARALLEL_WORKER_LIMIT),
parser_errposition(pstate, opt->location)));
/*
* Disable parallel vacuum, if user has specified parallel
* degree as zero.
*/
if (nworkers == 0)
params.nworkers = -1;
else
params.nworkers = nworkers;
}
}
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
......@@ -152,6 +199,11 @@ ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel)
!(params.options & (VACOPT_FULL | VACOPT_FREEZE)));
Assert(!(params.options & VACOPT_SKIPTOAST));
if ((params.options & VACOPT_FULL) && parallel_option)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot specify both FULL and PARALLEL options")));
/*
* Make sure VACOPT_ANALYZE is specified if any column lists are present.
*/
......@@ -383,6 +435,9 @@ vacuum(List *relations, VacuumParams *params,
VacuumPageHit = 0;
VacuumPageMiss = 0;
VacuumPageDirty = 0;
VacuumCostBalanceLocal = 0;
VacuumSharedCostBalance = NULL;
VacuumActiveNWorkers = NULL;
/*
* Loop to process each selected relation.
......@@ -1941,16 +1996,26 @@ vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
void
vacuum_delay_point(void)
{
double msec = 0;
/* Always check for interrupts */
CHECK_FOR_INTERRUPTS();
/* Nap if appropriate */
if (VacuumCostActive && !InterruptPending &&
VacuumCostBalance >= VacuumCostLimit)
{
double msec;
if (!VacuumCostActive || InterruptPending)
return;
/*
* For parallel vacuum, the delay is computed based on the shared cost
* balance. See compute_parallel_delay.
*/
if (VacuumSharedCostBalance != NULL)
msec = compute_parallel_delay();
else if (VacuumCostBalance >= VacuumCostLimit)
msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
/* Nap if appropriate */
if (msec > 0)
{
if (msec > VacuumCostDelay * 4)
msec = VacuumCostDelay * 4;
......@@ -1966,6 +2031,66 @@ vacuum_delay_point(void)
}
}
/*
* Computes the vacuum delay for parallel workers.
*
* The basic idea of a cost-based vacuum delay for parallel vacuum is to allow
* each worker to sleep proportional to the work done by it. We achieve this
* by allowing all parallel vacuum workers including the leader process to
* have a shared view of cost related parameters (mainly VacuumCostBalance).
* We allow each worker to update it as and when it has incurred any cost and
* then based on that decide whether it needs to sleep. We compute the time
* to sleep for a worker based on the cost it has incurred
* (VacuumCostBalanceLocal) and then reduce the VacuumSharedCostBalance by
* that amount. This avoids letting the workers sleep who have done less or
* no I/O as compared to other workers and therefore can ensure that workers
* who are doing more I/O got throttled more.
*
* We allow any worker to sleep only if it has performed the I/O above a
* certain threshold, which is calculated based on the number of active
* workers (VacuumActiveNWorkers), and the overall cost balance is more than
* VacuumCostLimit set by the system. The testing reveals that we achieve
* the required throttling if we allow a worker that has done more than 50%
* of its share of work to sleep.
*/
static double
compute_parallel_delay(void)
{
double msec = 0;
uint32 shared_balance;
int nworkers;
/* Parallel vacuum must be active */
Assert(VacuumSharedCostBalance);
nworkers = pg_atomic_read_u32(VacuumActiveNWorkers);
/* At least count itself */
Assert(nworkers >= 1);
/* Update the shared cost balance value atomically */
shared_balance = pg_atomic_add_fetch_u32(VacuumSharedCostBalance, VacuumCostBalance);
/* Compute the total local balance for the current worker */
VacuumCostBalanceLocal += VacuumCostBalance;
if ((shared_balance >= VacuumCostLimit) &&
(VacuumCostBalanceLocal > 0.5 * (VacuumCostLimit / nworkers)))
{
/* Compute sleep time based on the local cost balance */
msec = VacuumCostDelay * VacuumCostBalanceLocal / VacuumCostLimit;
pg_atomic_sub_fetch_u32(VacuumSharedCostBalance, VacuumCostBalanceLocal);
VacuumCostBalanceLocal = 0;
}
/*
* Reset the local balance as we accumulated it into the shared value.
*/
VacuumCostBalance = 0;
return msec;
}
/*
* A wrapper function of defGetBoolean().
*
......
......@@ -2886,6 +2886,8 @@ table_recheck_autovac(Oid relid, HTAB *table_toast_map,
(!wraparound ? VACOPT_SKIP_LOCKED : 0);
tab->at_params.index_cleanup = VACOPT_TERNARY_DEFAULT;
tab->at_params.truncate = VACOPT_TERNARY_DEFAULT;
/* As of now, we don't support parallel vacuum for autovacuum */
tab->at_params.nworkers = -1;
tab->at_params.freeze_min_age = freeze_min_age;
tab->at_params.freeze_table_age = freeze_table_age;
tab->at_params.multixact_freeze_min_age = multixact_freeze_min_age;
......
......@@ -3597,7 +3597,7 @@ psql_completion(const char *text, int start, int end)
if (ends_with(prev_wd, '(') || ends_with(prev_wd, ','))
COMPLETE_WITH("FULL", "FREEZE", "ANALYZE", "VERBOSE",
"DISABLE_PAGE_SKIPPING", "SKIP_LOCKED",
"INDEX_CLEANUP", "TRUNCATE");
"INDEX_CLEANUP", "TRUNCATE", "PARALLEL");
else if (TailMatches("FULL|FREEZE|ANALYZE|VERBOSE|DISABLE_PAGE_SKIPPING|SKIP_LOCKED|INDEX_CLEANUP|TRUNCATE"))
COMPLETE_WITH("ON", "OFF");
}
......
......@@ -23,7 +23,9 @@
#include "nodes/lockoptions.h"
#include "nodes/primnodes.h"
#include "storage/bufpage.h"
#include "storage/dsm.h"
#include "storage/lockdefs.h"
#include "storage/shm_toc.h"
#include "utils/relcache.h"
#include "utils/snapshot.h"
......@@ -193,6 +195,7 @@ extern Size SyncScanShmemSize(void);
struct VacuumParams;
extern void heap_vacuum_rel(Relation onerel,
struct VacuumParams *params, BufferAccessStrategy bstrategy);
extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc);
/* in heap/heapam_visibility.c */
extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot,
......
......@@ -33,7 +33,8 @@ typedef struct ParallelContext
{
dlist_node node;
SubTransactionId subid;
int nworkers;
int nworkers; /* Maximum number of workers to launch */
int nworkers_to_launch; /* Actual number of workers to launch */
int nworkers_launched;
char *library_name;
char *function_name;
......@@ -63,6 +64,7 @@ extern ParallelContext *CreateParallelContext(const char *library_name,
const char *function_name, int nworkers);
extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch);
extern void LaunchParallelWorkers(ParallelContext *pcxt);
extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt);
extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
......
......@@ -222,6 +222,13 @@ typedef struct VacuumParams
* default value depends on reloptions */
VacOptTernaryValue truncate; /* Truncate empty pages at the end,
* default value depends on reloptions */
/*
* The number of parallel vacuum workers. 0 by default which means choose
* based on the number of indexes. -1 indicates a parallel vacuum is
* disabled.
*/
int nworkers;
} VacuumParams;
/* GUC parameters */
......@@ -231,6 +238,11 @@ extern int vacuum_freeze_table_age;
extern int vacuum_multixact_freeze_min_age;
extern int vacuum_multixact_freeze_table_age;
/* Variables for cost-based parallel vacuum */
extern pg_atomic_uint32 *VacuumSharedCostBalance;
extern pg_atomic_uint32 *VacuumActiveNWorkers;
extern int VacuumCostBalanceLocal;
/* in commands/vacuum.c */
extern void ExecVacuum(ParseState *pstate, VacuumStmt *vacstmt, bool isTopLevel);
......
......@@ -92,6 +92,40 @@ CONTEXT: SQL function "do_analyze" statement 1
SQL function "wrap_do_analyze" statement 1
VACUUM FULL vactst;
VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
-- PARALLEL option
CREATE TABLE pvactst (i INT, a INT[], p POINT) with (autovacuum_enabled = off);
INSERT INTO pvactst SELECT i, array[1,2,3], point(i, i+1) FROM generate_series(1,1000) i;
CREATE INDEX btree_pvactst ON pvactst USING btree (i);
CREATE INDEX hash_pvactst ON pvactst USING hash (i);
CREATE INDEX brin_pvactst ON pvactst USING brin (i);
CREATE INDEX gin_pvactst ON pvactst USING gin (a);
CREATE INDEX gist_pvactst ON pvactst USING gist (p);
CREATE INDEX spgist_pvactst ON pvactst USING spgist (p);
-- VACUUM invokes parallel index cleanup
SET min_parallel_index_scan_size to 0;
VACUUM (PARALLEL 2) pvactst;
-- VACUUM invokes parallel bulk-deletion
UPDATE pvactst SET i = i WHERE i < 1000;
VACUUM (PARALLEL 2) pvactst;
UPDATE pvactst SET i = i WHERE i < 1000;
VACUUM (PARALLEL 0) pvactst; -- disable parallel vacuum
VACUUM (PARALLEL -1) pvactst; -- error
ERROR: parallel vacuum degree must be between 0 and 1024
LINE 1: VACUUM (PARALLEL -1) pvactst;
^
VACUUM (PARALLEL 2, INDEX_CLEANUP FALSE) pvactst;
VACUUM (PARALLEL 2, FULL TRUE) pvactst; -- error, cannot use both PARALLEL and FULL
ERROR: cannot specify both FULL and PARALLEL options
VACUUM (PARALLEL) pvactst; -- error, cannot use PARALLEL option without parallel degree
ERROR: parallel option requires a value between 0 and 1024
LINE 1: VACUUM (PARALLEL) pvactst;
^
CREATE TEMPORARY TABLE tmp (a int PRIMARY KEY);
CREATE INDEX tmp_idx1 ON tmp (a);
VACUUM (PARALLEL 1) tmp; -- disables parallel vacuum option
WARNING: disabling parallel option of vacuum on "tmp" --- cannot vacuum temporary tables in parallel
RESET min_parallel_index_scan_size;
DROP TABLE pvactst;
-- INDEX_CLEANUP option
CREATE TABLE no_index_cleanup (i INT PRIMARY KEY, t TEXT);
-- Use uncompressed data stored in toast.
......
......@@ -75,6 +75,37 @@ VACUUM FULL vactst;
VACUUM (DISABLE_PAGE_SKIPPING) vaccluster;
-- PARALLEL option
CREATE TABLE pvactst (i INT, a INT[], p POINT) with (autovacuum_enabled = off);
INSERT INTO pvactst SELECT i, array[1,2,3], point(i, i+1) FROM generate_series(1,1000) i;
CREATE INDEX btree_pvactst ON pvactst USING btree (i);
CREATE INDEX hash_pvactst ON pvactst USING hash (i);
CREATE INDEX brin_pvactst ON pvactst USING brin (i);
CREATE INDEX gin_pvactst ON pvactst USING gin (a);
CREATE INDEX gist_pvactst ON pvactst USING gist (p);
CREATE INDEX spgist_pvactst ON pvactst USING spgist (p);
-- VACUUM invokes parallel index cleanup
SET min_parallel_index_scan_size to 0;
VACUUM (PARALLEL 2) pvactst;
-- VACUUM invokes parallel bulk-deletion
UPDATE pvactst SET i = i WHERE i < 1000;
VACUUM (PARALLEL 2) pvactst;
UPDATE pvactst SET i = i WHERE i < 1000;
VACUUM (PARALLEL 0) pvactst; -- disable parallel vacuum
VACUUM (PARALLEL -1) pvactst; -- error
VACUUM (PARALLEL 2, INDEX_CLEANUP FALSE) pvactst;
VACUUM (PARALLEL 2, FULL TRUE) pvactst; -- error, cannot use both PARALLEL and FULL
VACUUM (PARALLEL) pvactst; -- error, cannot use PARALLEL option without parallel degree
CREATE TEMPORARY TABLE tmp (a int PRIMARY KEY);
CREATE INDEX tmp_idx1 ON tmp (a);
VACUUM (PARALLEL 1) tmp; -- disables parallel vacuum option
RESET min_parallel_index_scan_size;
DROP TABLE pvactst;
-- INDEX_CLEANUP option
CREATE TABLE no_index_cleanup (i INT PRIMARY KEY, t TEXT);
-- Use uncompressed data stored in toast.
......
......@@ -1216,7 +1216,11 @@ LPVOID
LPWSTR
LSEG
LUID
LVDeadTuples
LVParallelState
LVRelStats
LVShared
LVSharedIndStats
LWLock
LWLockHandle
LWLockMinimallyPadded
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment