Commit 7526e102 authored by Alvaro Herrera's avatar Alvaro Herrera

BRIN auto-summarization

Previously, only VACUUM would cause a page range to get initially
summarized by BRIN indexes, which for some use cases takes too much time
since the inserts occur.  To avoid the delay, have brininsert request a
summarization run for the previous range as soon as the first tuple is
inserted into the first page of the next range.  Autovacuum is in charge
of processing these requests, after doing all the regular vacuuming/
analyzing work on tables.

This doesn't impose any new tasks on autovacuum, because autovacuum was
already in charge of doing summarizations.  The only actual effect is to
change the timing, i.e. that it occurs earlier.  For this reason, we
don't go any great lengths to record these requests very robustly; if
they are lost because of a server crash or restart, they will happen at
a later time anyway.

Most of the new code here is in autovacuum, which can now be told about
"work items" to process.  This can be used for other things such as GIN
pending list cleaning, perhaps visibility map bit setting, both of which
are currently invoked during vacuum, but do not really depend on vacuum
taking place.

The requests are at the page range level, a granularity for which we did
not have SQL-level access; we only had index-level summarization
requests via brin_summarize_new_values().  It seems reasonable to add
SQL-level access to range-level summarization too, so add a function
brin_summarize_range() to do that.

Authors: Álvaro Herrera, based on sketch from Simon Riggs.
Reviewed-by: Thomas Munro.
Discussion: https://postgr.es/m/20170301045823.vneqdqkmsd4as4ds@alvherre.pgsql
parent 7220c7b3
...@@ -74,9 +74,14 @@ ...@@ -74,9 +74,14 @@
tuple; those tuples remain unsummarized until a summarization run is tuple; those tuples remain unsummarized until a summarization run is
invoked later, creating initial summaries. invoked later, creating initial summaries.
This process can be invoked manually using the This process can be invoked manually using the
<function>brin_summarize_new_values(regclass)</function> function, <function>brin_summarize_range(regclass, bigint)</function> or
or automatically when <command>VACUUM</command> processes the table. <function>brin_summarize_new_values(regclass)</function> functions;
automatically when <command>VACUUM</command> processes the table;
or by automatic summarization executed by autovacuum, as insertions
occur. (This last trigger is disabled by default and can be enabled
with the <literal>autosummarize</literal> parameter.)
</para> </para>
</sect2> </sect2>
</sect1> </sect1>
......
...@@ -19683,6 +19683,13 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); ...@@ -19683,6 +19683,13 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
<entry><type>integer</type></entry> <entry><type>integer</type></entry>
<entry>summarize page ranges not already summarized</entry> <entry>summarize page ranges not already summarized</entry>
</row> </row>
<row>
<entry>
<literal><function>brin_summarize_range(<parameter>index</> <type>regclass</>, <parameter>blockNumber</> <type>bigint</type>)</function></literal>
</entry>
<entry><type>integer</type></entry>
<entry>summarize the page range covering the given block, if not already summarized</entry>
</row>
<row> <row>
<entry> <entry>
<literal><function>gin_clean_pending_list(<parameter>index</> <type>regclass</>)</function></literal> <literal><function>gin_clean_pending_list(<parameter>index</> <type>regclass</>)</function></literal>
...@@ -19700,7 +19707,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); ...@@ -19700,7 +19707,8 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
that are not currently summarized by the index; for any such range that are not currently summarized by the index; for any such range
it creates a new summary index tuple by scanning the table pages. it creates a new summary index tuple by scanning the table pages.
It returns the number of new page range summaries that were inserted It returns the number of new page range summaries that were inserted
into the index. into the index. <function>brin_summarize_range</> does the same, except
it only summarizes the range that covers the given block number.
</para> </para>
<para> <para>
......
...@@ -382,7 +382,7 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class= ...@@ -382,7 +382,7 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class=
</variablelist> </variablelist>
<para> <para>
<acronym>BRIN</> indexes accept a different parameter: <acronym>BRIN</> indexes accept different parameters:
</para> </para>
<variablelist> <variablelist>
...@@ -396,6 +396,16 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class= ...@@ -396,6 +396,16 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class=
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><literal>autosummarize</></term>
<listitem>
<para>
Defines whether a summarization run is invoked for the previous page
range whenever an insertion is detected on the next one.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
</refsect2> </refsect2>
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "catalog/pg_am.h" #include "catalog/pg_am.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/freespace.h" #include "storage/freespace.h"
#include "utils/builtins.h" #include "utils/builtins.h"
...@@ -60,10 +61,12 @@ typedef struct BrinOpaque ...@@ -60,10 +61,12 @@ typedef struct BrinOpaque
BrinDesc *bo_bdesc; BrinDesc *bo_bdesc;
} BrinOpaque; } BrinOpaque;
#define BRIN_ALL_BLOCKRANGES InvalidBlockNumber
static BrinBuildState *initialize_brin_buildstate(Relation idxRel, static BrinBuildState *initialize_brin_buildstate(Relation idxRel,
BrinRevmap *revmap, BlockNumber pagesPerRange); BrinRevmap *revmap, BlockNumber pagesPerRange);
static void terminate_brin_buildstate(BrinBuildState *state); static void terminate_brin_buildstate(BrinBuildState *state);
static void brinsummarize(Relation index, Relation heapRel, static void brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
double *numSummarized, double *numExisting); double *numSummarized, double *numExisting);
static void form_and_insert_tuple(BrinBuildState *state); static void form_and_insert_tuple(BrinBuildState *state);
static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a, static void union_tuples(BrinDesc *bdesc, BrinMemTuple *a,
...@@ -126,8 +129,11 @@ brinhandler(PG_FUNCTION_ARGS) ...@@ -126,8 +129,11 @@ brinhandler(PG_FUNCTION_ARGS)
* with those of the new tuple. If the tuple values are not consistent with * with those of the new tuple. If the tuple values are not consistent with
* the summary tuple, we need to update the index tuple. * the summary tuple, we need to update the index tuple.
* *
* If autosummarization is enabled, check if we need to summarize the previous
* page range.
*
* If the range is not currently summarized (i.e. the revmap returns NULL for * If the range is not currently summarized (i.e. the revmap returns NULL for
* it), there's nothing to do. * it), there's nothing to do for this tuple.
*/ */
bool bool
brininsert(Relation idxRel, Datum *values, bool *nulls, brininsert(Relation idxRel, Datum *values, bool *nulls,
...@@ -136,30 +142,59 @@ brininsert(Relation idxRel, Datum *values, bool *nulls, ...@@ -136,30 +142,59 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
IndexInfo *indexInfo) IndexInfo *indexInfo)
{ {
BlockNumber pagesPerRange; BlockNumber pagesPerRange;
BlockNumber origHeapBlk;
BlockNumber heapBlk;
BrinDesc *bdesc = (BrinDesc *) indexInfo->ii_AmCache; BrinDesc *bdesc = (BrinDesc *) indexInfo->ii_AmCache;
BrinRevmap *revmap; BrinRevmap *revmap;
Buffer buf = InvalidBuffer; Buffer buf = InvalidBuffer;
MemoryContext tupcxt = NULL; MemoryContext tupcxt = NULL;
MemoryContext oldcxt = CurrentMemoryContext; MemoryContext oldcxt = CurrentMemoryContext;
bool autosummarize = BrinGetAutoSummarize(idxRel);
revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL); revmap = brinRevmapInitialize(idxRel, &pagesPerRange, NULL);
/*
* origHeapBlk is the block number where the insertion occurred. heapBlk
* is the first block in the corresponding page range.
*/
origHeapBlk = ItemPointerGetBlockNumber(heaptid);
heapBlk = (origHeapBlk / pagesPerRange) * pagesPerRange;
for (;;) for (;;)
{ {
bool need_insert = false; bool need_insert = false;
OffsetNumber off; OffsetNumber off;
BrinTuple *brtup; BrinTuple *brtup;
BrinMemTuple *dtup; BrinMemTuple *dtup;
BlockNumber heapBlk;
int keyno; int keyno;
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
heapBlk = ItemPointerGetBlockNumber(heaptid); /*
/* normalize the block number to be the first block in the range */ * If auto-summarization is enabled and we just inserted the first
heapBlk = (heapBlk / pagesPerRange) * pagesPerRange; * tuple into the first block of a new non-first page range, request a
brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off, NULL, * summarization run of the previous range.
BUFFER_LOCK_SHARE, NULL); */
if (autosummarize &&
heapBlk > 0 &&
heapBlk == origHeapBlk &&
ItemPointerGetOffsetNumber(heaptid) == FirstOffsetNumber)
{
BlockNumber lastPageRange = heapBlk - 1;
BrinTuple *lastPageTuple;
lastPageTuple =
brinGetTupleForHeapBlock(revmap, lastPageRange, &buf, &off,
NULL, BUFFER_LOCK_SHARE, NULL);
if (!lastPageTuple)
AutoVacuumRequestWork(AVW_BRINSummarizeRange,
RelationGetRelid(idxRel),
lastPageRange);
brin_free_tuple(lastPageTuple);
}
brtup = brinGetTupleForHeapBlock(revmap, heapBlk, &buf, &off,
NULL, BUFFER_LOCK_SHARE, NULL);
/* if range is unsummarized, there's nothing to do */ /* if range is unsummarized, there's nothing to do */
if (!brtup) if (!brtup)
...@@ -747,7 +782,7 @@ brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) ...@@ -747,7 +782,7 @@ brinvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
brin_vacuum_scan(info->index, info->strategy); brin_vacuum_scan(info->index, info->strategy);
brinsummarize(info->index, heapRel, brinsummarize(info->index, heapRel, BRIN_ALL_BLOCKRANGES,
&stats->num_index_tuples, &stats->num_index_tuples); &stats->num_index_tuples, &stats->num_index_tuples);
heap_close(heapRel, AccessShareLock); heap_close(heapRel, AccessShareLock);
...@@ -765,7 +800,8 @@ brinoptions(Datum reloptions, bool validate) ...@@ -765,7 +800,8 @@ brinoptions(Datum reloptions, bool validate)
BrinOptions *rdopts; BrinOptions *rdopts;
int numoptions; int numoptions;
static const relopt_parse_elt tab[] = { static const relopt_parse_elt tab[] = {
{"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)} {"pages_per_range", RELOPT_TYPE_INT, offsetof(BrinOptions, pagesPerRange)},
{"autosummarize", RELOPT_TYPE_BOOL, offsetof(BrinOptions, autosummarize)}
}; };
options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN, options = parseRelOptions(reloptions, validate, RELOPT_KIND_BRIN,
...@@ -791,13 +827,40 @@ brinoptions(Datum reloptions, bool validate) ...@@ -791,13 +827,40 @@ brinoptions(Datum reloptions, bool validate)
*/ */
Datum Datum
brin_summarize_new_values(PG_FUNCTION_ARGS) brin_summarize_new_values(PG_FUNCTION_ARGS)
{
Datum relation = PG_GETARG_DATUM(0);
return DirectFunctionCall2(brin_summarize_range,
relation,
Int64GetDatum((int64) BRIN_ALL_BLOCKRANGES));
}
/*
* SQL-callable function to summarize the indicated page range, if not already
* summarized. If the second argument is BRIN_ALL_BLOCKRANGES, all
* unsummarized ranges are summarized.
*/
Datum
brin_summarize_range(PG_FUNCTION_ARGS)
{ {
Oid indexoid = PG_GETARG_OID(0); Oid indexoid = PG_GETARG_OID(0);
int64 heapBlk64 = PG_GETARG_INT64(1);
BlockNumber heapBlk;
Oid heapoid; Oid heapoid;
Relation indexRel; Relation indexRel;
Relation heapRel; Relation heapRel;
double numSummarized = 0; double numSummarized = 0;
if (heapBlk64 > BRIN_ALL_BLOCKRANGES || heapBlk64 < 0)
{
char *blk = psprintf(INT64_FORMAT, heapBlk64);
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("block number out of range: %s", blk)));
}
heapBlk = (BlockNumber) heapBlk64;
/* /*
* We must lock table before index to avoid deadlocks. However, if the * We must lock table before index to avoid deadlocks. However, if the
* passed indexoid isn't an index then IndexGetRelation() will fail. * passed indexoid isn't an index then IndexGetRelation() will fail.
...@@ -837,7 +900,7 @@ brin_summarize_new_values(PG_FUNCTION_ARGS) ...@@ -837,7 +900,7 @@ brin_summarize_new_values(PG_FUNCTION_ARGS)
RelationGetRelationName(indexRel)))); RelationGetRelationName(indexRel))));
/* OK, do it */ /* OK, do it */
brinsummarize(indexRel, heapRel, &numSummarized, NULL); brinsummarize(indexRel, heapRel, heapBlk, &numSummarized, NULL);
relation_close(indexRel, ShareUpdateExclusiveLock); relation_close(indexRel, ShareUpdateExclusiveLock);
relation_close(heapRel, ShareUpdateExclusiveLock); relation_close(heapRel, ShareUpdateExclusiveLock);
...@@ -1063,17 +1126,17 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel, ...@@ -1063,17 +1126,17 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
} }
/* /*
* Scan a complete BRIN index, and summarize each page range that's not already * Summarize page ranges that are not already summarized. If pageRange is
* summarized. The index and heap must have been locked by caller in at * BRIN_ALL_BLOCKRANGES then the whole table is scanned; otherwise, only the
* least ShareUpdateExclusiveLock mode. * page range containing the given heap page number is scanned.
* *
* For each new index tuple inserted, *numSummarized (if not NULL) is * For each new index tuple inserted, *numSummarized (if not NULL) is
* incremented; for each existing tuple, *numExisting (if not NULL) is * incremented; for each existing tuple, *numExisting (if not NULL) is
* incremented. * incremented.
*/ */
static void static void
brinsummarize(Relation index, Relation heapRel, double *numSummarized, brinsummarize(Relation index, Relation heapRel, BlockNumber pageRange,
double *numExisting) double *numSummarized, double *numExisting)
{ {
BrinRevmap *revmap; BrinRevmap *revmap;
BrinBuildState *state = NULL; BrinBuildState *state = NULL;
...@@ -1082,15 +1145,40 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized, ...@@ -1082,15 +1145,40 @@ brinsummarize(Relation index, Relation heapRel, double *numSummarized,
BlockNumber heapBlk; BlockNumber heapBlk;
BlockNumber pagesPerRange; BlockNumber pagesPerRange;
Buffer buf; Buffer buf;
BlockNumber startBlk;
BlockNumber endBlk;
/* determine range of pages to process; nothing to do for an empty table */
heapNumBlocks = RelationGetNumberOfBlocks(heapRel);
if (heapNumBlocks == 0)
return;
revmap = brinRevmapInitialize(index, &pagesPerRange, NULL); revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);
if (pageRange == BRIN_ALL_BLOCKRANGES)
{
startBlk = 0;
endBlk = heapNumBlocks;
}
else
{
startBlk = (pageRange / pagesPerRange) * pagesPerRange;
/* Nothing to do if start point is beyond end of table */
if (startBlk > heapNumBlocks)
{
brinRevmapTerminate(revmap);
return;
}
endBlk = startBlk + pagesPerRange;
if (endBlk > heapNumBlocks)
endBlk = heapNumBlocks;
}
/* /*
* Scan the revmap to find unsummarized items. * Scan the revmap to find unsummarized items.
*/ */
buf = InvalidBuffer; buf = InvalidBuffer;
heapNumBlocks = RelationGetNumberOfBlocks(heapRel); for (heapBlk = startBlk; heapBlk < endBlk; heapBlk += pagesPerRange)
for (heapBlk = 0; heapBlk < heapNumBlocks; heapBlk += pagesPerRange)
{ {
BrinTuple *tup; BrinTuple *tup;
OffsetNumber off; OffsetNumber off;
......
...@@ -205,7 +205,11 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk, ...@@ -205,7 +205,11 @@ brinGetTupleForHeapBlock(BrinRevmap *revmap, BlockNumber heapBlk,
/* normalize the heap block number to be the first page in the range */ /* normalize the heap block number to be the first page in the range */
heapBlk = (heapBlk / revmap->rm_pagesPerRange) * revmap->rm_pagesPerRange; heapBlk = (heapBlk / revmap->rm_pagesPerRange) * revmap->rm_pagesPerRange;
/* Compute the revmap page number we need */ /*
* Compute the revmap page number we need. If Invalid is returned (i.e.,
* the revmap page hasn't been created yet), the requested page range is
* not summarized.
*/
mapBlk = revmap_get_blkno(revmap, heapBlk); mapBlk = revmap_get_blkno(revmap, heapBlk);
if (mapBlk == InvalidBlockNumber) if (mapBlk == InvalidBlockNumber)
{ {
......
...@@ -92,6 +92,15 @@ ...@@ -92,6 +92,15 @@
static relopt_bool boolRelOpts[] = static relopt_bool boolRelOpts[] =
{ {
{
{
"autosummarize",
"Enables automatic summarization on this BRIN index",
RELOPT_KIND_BRIN,
AccessExclusiveLock
},
false
},
{ {
{ {
"autovacuum_enabled", "autovacuum_enabled",
......
...@@ -92,7 +92,9 @@ ...@@ -92,7 +92,9 @@
#include "storage/procsignal.h" #include "storage/procsignal.h"
#include "storage/sinvaladt.h" #include "storage/sinvaladt.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "utils/dsa.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/fmgrprotos.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/ps_status.h" #include "utils/ps_status.h"
...@@ -252,9 +254,10 @@ typedef enum ...@@ -252,9 +254,10 @@ typedef enum
* av_runningWorkers the WorkerInfo non-free queue * av_runningWorkers the WorkerInfo non-free queue
* av_startingWorker pointer to WorkerInfo currently being started (cleared by * av_startingWorker pointer to WorkerInfo currently being started (cleared by
* the worker itself as soon as it's up and running) * the worker itself as soon as it's up and running)
* av_dsa_handle handle for allocatable shared memory
* *
* This struct is protected by AutovacuumLock, except for av_signal and parts * This struct is protected by AutovacuumLock, except for av_signal and parts
* of the worker list (see above). * of the worker list (see above). av_dsa_handle is readable unlocked.
*------------- *-------------
*/ */
typedef struct typedef struct
...@@ -264,6 +267,8 @@ typedef struct ...@@ -264,6 +267,8 @@ typedef struct
dlist_head av_freeWorkers; dlist_head av_freeWorkers;
dlist_head av_runningWorkers; dlist_head av_runningWorkers;
WorkerInfo av_startingWorker; WorkerInfo av_startingWorker;
dsa_handle av_dsa_handle;
dsa_pointer av_workitems;
} AutoVacuumShmemStruct; } AutoVacuumShmemStruct;
static AutoVacuumShmemStruct *AutoVacuumShmem; static AutoVacuumShmemStruct *AutoVacuumShmem;
...@@ -278,6 +283,32 @@ static MemoryContext DatabaseListCxt = NULL; ...@@ -278,6 +283,32 @@ static MemoryContext DatabaseListCxt = NULL;
/* Pointer to my own WorkerInfo, valid on each worker */ /* Pointer to my own WorkerInfo, valid on each worker */
static WorkerInfo MyWorkerInfo = NULL; static WorkerInfo MyWorkerInfo = NULL;
/*
* Autovacuum workitem array, stored in AutoVacuumShmem->av_workitems. This
* list is mostly protected by AutovacuumLock, except that if it's marked
* 'active' other processes must not modify the work-identifying members,
* though changing the list pointers is okay.
*/
typedef struct AutoVacuumWorkItem
{
AutoVacuumWorkItemType avw_type;
Oid avw_database;
Oid avw_relation;
BlockNumber avw_blockNumber;
bool avw_active;
dsa_pointer avw_next; /* doubly linked list pointers */
dsa_pointer avw_prev;
} AutoVacuumWorkItem;
#define NUM_WORKITEMS 256
typedef struct
{
dsa_pointer avs_usedItems;
dsa_pointer avs_freeItems;
} AutovacWorkItems;
static dsa_area *AutoVacuumDSA = NULL;
/* PID of launcher, valid only in worker while shutting down */ /* PID of launcher, valid only in worker while shutting down */
int AutovacuumLauncherPid = 0; int AutovacuumLauncherPid = 0;
...@@ -316,11 +347,16 @@ static AutoVacOpts *extract_autovac_opts(HeapTuple tup, ...@@ -316,11 +347,16 @@ static AutoVacOpts *extract_autovac_opts(HeapTuple tup,
static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared, static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
PgStat_StatDBEntry *shared, PgStat_StatDBEntry *shared,
PgStat_StatDBEntry *dbentry); PgStat_StatDBEntry *dbentry);
static void perform_work_item(AutoVacuumWorkItem *workitem);
static void autovac_report_activity(autovac_table *tab); static void autovac_report_activity(autovac_table *tab);
static void autovac_report_workitem(AutoVacuumWorkItem *workitem,
const char *nspname, const char *relname);
static void av_sighup_handler(SIGNAL_ARGS); static void av_sighup_handler(SIGNAL_ARGS);
static void avl_sigusr2_handler(SIGNAL_ARGS); static void avl_sigusr2_handler(SIGNAL_ARGS);
static void avl_sigterm_handler(SIGNAL_ARGS); static void avl_sigterm_handler(SIGNAL_ARGS);
static void autovac_refresh_stats(void); static void autovac_refresh_stats(void);
static void remove_wi_from_list(dsa_pointer *list, dsa_pointer wi_ptr);
static void add_wi_to_list(dsa_pointer *list, dsa_pointer wi_ptr);
...@@ -574,6 +610,28 @@ AutoVacLauncherMain(int argc, char *argv[]) ...@@ -574,6 +610,28 @@ AutoVacLauncherMain(int argc, char *argv[])
*/ */
rebuild_database_list(InvalidOid); rebuild_database_list(InvalidOid);
/*
* Set up our DSA so that backends can install work-item requests. It may
* already exist as created by a previous launcher.
*/
if (!AutoVacuumShmem->av_dsa_handle)
{
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
AutoVacuumDSA = dsa_create(AutovacuumLock->tranche);
/* make sure it doesn't go away even if we do */
dsa_pin(AutoVacuumDSA);
dsa_pin_mapping(AutoVacuumDSA);
AutoVacuumShmem->av_dsa_handle = dsa_get_handle(AutoVacuumDSA);
/* delay array allocation until first request */
AutoVacuumShmem->av_workitems = InvalidDsaPointer;
LWLockRelease(AutovacuumLock);
}
else
{
AutoVacuumDSA = dsa_attach(AutoVacuumShmem->av_dsa_handle);
dsa_pin_mapping(AutoVacuumDSA);
}
/* loop until shutdown request */ /* loop until shutdown request */
while (!got_SIGTERM) while (!got_SIGTERM)
{ {
...@@ -1617,6 +1675,14 @@ AutoVacWorkerMain(int argc, char *argv[]) ...@@ -1617,6 +1675,14 @@ AutoVacWorkerMain(int argc, char *argv[])
{ {
char dbname[NAMEDATALEN]; char dbname[NAMEDATALEN];
if (AutoVacuumShmem->av_dsa_handle)
{
/* First use of DSA in this worker, so attach to it */
Assert(!AutoVacuumDSA);
AutoVacuumDSA = dsa_attach(AutoVacuumShmem->av_dsa_handle);
dsa_pin_mapping(AutoVacuumDSA);
}
/* /*
* Report autovac startup to the stats collector. We deliberately do * Report autovac startup to the stats collector. We deliberately do
* this before InitPostgres, so that the last_autovac_time will get * this before InitPostgres, so that the last_autovac_time will get
...@@ -2466,6 +2532,69 @@ deleted: ...@@ -2466,6 +2532,69 @@ deleted:
VacuumCostLimit = stdVacuumCostLimit; VacuumCostLimit = stdVacuumCostLimit;
} }
/*
* Perform additional work items, as requested by backends.
*/
if (AutoVacuumShmem->av_workitems)
{
dsa_pointer wi_ptr;
AutovacWorkItems *workitems;
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
/*
* Scan the list of pending items, and process the inactive ones in
* our database.
*/
workitems = (AutovacWorkItems *)
dsa_get_address(AutoVacuumDSA, AutoVacuumShmem->av_workitems);
wi_ptr = workitems->avs_usedItems;
while (wi_ptr != InvalidDsaPointer)
{
AutoVacuumWorkItem *workitem;
workitem = (AutoVacuumWorkItem *)
dsa_get_address(AutoVacuumDSA, wi_ptr);
if (workitem->avw_database == MyDatabaseId && !workitem->avw_active)
{
dsa_pointer next_ptr;
/* claim this one */
workitem->avw_active = true;
LWLockRelease(AutovacuumLock);
perform_work_item(workitem);
/*
* Check for config changes before acquiring lock for further
* jobs.
*/
CHECK_FOR_INTERRUPTS();
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
/* Put the array item back for the next user */
next_ptr = workitem->avw_next;
remove_wi_from_list(&workitems->avs_usedItems, wi_ptr);
add_wi_to_list(&workitems->avs_freeItems, wi_ptr);
wi_ptr = next_ptr;
}
else
wi_ptr = workitem->avw_next;
}
/* all done */
LWLockRelease(AutovacuumLock);
}
/* /*
* We leak table_toast_map here (among other things), but since we're * We leak table_toast_map here (among other things), but since we're
* going away soon, it's not a problem. * going away soon, it's not a problem.
...@@ -2498,6 +2627,103 @@ deleted: ...@@ -2498,6 +2627,103 @@ deleted:
CommitTransactionCommand(); CommitTransactionCommand();
} }
/*
* Execute a previously registered work item.
*/
static void
perform_work_item(AutoVacuumWorkItem *workitem)
{
char *cur_datname = NULL;
char *cur_nspname = NULL;
char *cur_relname = NULL;
/*
* Note we do not store table info in MyWorkerInfo, since this is not
* vacuuming proper.
*/
/*
* Save the relation name for a possible error message, to avoid a catalog
* lookup in case of an error. If any of these return NULL, then the
* relation has been dropped since last we checked; skip it. Note: they
* must live in a long-lived memory context because we call vacuum and
* analyze in different transactions.
*/
cur_relname = get_rel_name(workitem->avw_relation);
cur_nspname = get_namespace_name(get_rel_namespace(workitem->avw_relation));
cur_datname = get_database_name(MyDatabaseId);
if (!cur_relname || !cur_nspname || !cur_datname)
goto deleted2;
autovac_report_workitem(workitem, cur_nspname, cur_datname);
/*
* We will abort the current work item if something errors out, and
* continue with the next one; in particular, this happens if we are
* interrupted with SIGINT. Note that this means that the work item list
* can be lossy.
*/
PG_TRY();
{
/* have at it */
MemoryContextSwitchTo(TopTransactionContext);
switch (workitem->avw_type)
{
case AVW_BRINSummarizeRange:
DirectFunctionCall2(brin_summarize_range,
ObjectIdGetDatum(workitem->avw_relation),
Int64GetDatum((int64) workitem->avw_blockNumber));
break;
default:
elog(WARNING, "unrecognized work item found: type %d",
workitem->avw_type);
break;
}
/*
* Clear a possible query-cancel signal, to avoid a late reaction to
* an automatically-sent signal because of vacuuming the current table
* (we're done with it, so it would make no sense to cancel at this
* point.)
*/
QueryCancelPending = false;
}
PG_CATCH();
{
/*
* Abort the transaction, start a new one, and proceed with the next
* table in our list.
*/
HOLD_INTERRUPTS();
errcontext("processing work entry for relation \"%s.%s.%s\"",
cur_datname, cur_nspname, cur_relname);
EmitErrorReport();
/* this resets the PGXACT flags too */
AbortOutOfAnyTransaction();
FlushErrorState();
MemoryContextResetAndDeleteChildren(PortalContext);
/* restart our transaction for the following operations */
StartTransactionCommand();
RESUME_INTERRUPTS();
}
PG_END_TRY();
/* We intentionally do not set did_vacuum here */
/* be tidy */
deleted2:
if (cur_datname)
pfree(cur_datname);
if (cur_nspname)
pfree(cur_nspname);
if (cur_relname)
pfree(cur_relname);
}
/* /*
* extract_autovac_opts * extract_autovac_opts
* *
...@@ -2945,6 +3171,45 @@ autovac_report_activity(autovac_table *tab) ...@@ -2945,6 +3171,45 @@ autovac_report_activity(autovac_table *tab)
pgstat_report_activity(STATE_RUNNING, activity); pgstat_report_activity(STATE_RUNNING, activity);
} }
/*
* autovac_report_workitem
* Report to pgstat that autovacuum is processing a work item
*/
static void
autovac_report_workitem(AutoVacuumWorkItem *workitem,
const char *nspname, const char *relname)
{
char activity[MAX_AUTOVAC_ACTIV_LEN + 12 + 2];
char blk[12 + 2];
int len;
switch (workitem->avw_type)
{
case AVW_BRINSummarizeRange:
snprintf(activity, MAX_AUTOVAC_ACTIV_LEN,
"autovacuum: BRIN summarize");
break;
}
/*
* Report the qualified name of the relation, and the block number if any
*/
len = strlen(activity);
if (BlockNumberIsValid(workitem->avw_blockNumber))
snprintf(blk, sizeof(blk), " %u", workitem->avw_blockNumber);
else
blk[0] = '\0';
snprintf(activity + len, MAX_AUTOVAC_ACTIV_LEN - len,
" %s.%s%s", nspname, relname, blk);
/* Set statement_timestamp() to current time for pg_stat_activity */
SetCurrentStatementStartTimestamp();
pgstat_report_activity(STATE_RUNNING, activity);
}
/* /*
* AutoVacuumingActive * AutoVacuumingActive
* Check GUC vars and report whether the autovacuum process should be * Check GUC vars and report whether the autovacuum process should be
...@@ -2958,6 +3223,113 @@ AutoVacuumingActive(void) ...@@ -2958,6 +3223,113 @@ AutoVacuumingActive(void)
return true; return true;
} }
/*
* Request one work item to the next autovacuum run processing our database.
*/
void
AutoVacuumRequestWork(AutoVacuumWorkItemType type, Oid relationId,
BlockNumber blkno)
{
AutovacWorkItems *workitems;
dsa_pointer wi_ptr;
AutoVacuumWorkItem *workitem;
LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
/*
* It may be useful to de-duplicate the list upon insertion. For the only
* currently existing caller, this is not necessary.
*/
/* First use in this process? Set up DSA */
if (!AutoVacuumDSA)
{
if (!AutoVacuumShmem->av_dsa_handle)
{
/* autovacuum launcher not started; nothing can be done */
LWLockRelease(AutovacuumLock);
return;
}
AutoVacuumDSA = dsa_attach(AutoVacuumShmem->av_dsa_handle);
dsa_pin_mapping(AutoVacuumDSA);
}
/* First use overall? Allocate work items array */
if (AutoVacuumShmem->av_workitems == InvalidDsaPointer)
{
int i;
AutovacWorkItems *workitems;
AutoVacuumShmem->av_workitems =
dsa_allocate_extended(AutoVacuumDSA,
sizeof(AutovacWorkItems) +
NUM_WORKITEMS * sizeof(AutoVacuumWorkItem),
DSA_ALLOC_NO_OOM);
/* if out of memory, silently disregard the request */
if (AutoVacuumShmem->av_workitems == InvalidDsaPointer)
{
LWLockRelease(AutovacuumLock);
dsa_detach(AutoVacuumDSA);
AutoVacuumDSA = NULL;
return;
}
/* Initialize each array entry as a member of the free list */
workitems = dsa_get_address(AutoVacuumDSA, AutoVacuumShmem->av_workitems);
workitems->avs_usedItems = InvalidDsaPointer;
workitems->avs_freeItems = InvalidDsaPointer;
for (i = 0; i < NUM_WORKITEMS; i++)
{
/* XXX surely there is a simpler way to do this */
wi_ptr = AutoVacuumShmem->av_workitems + sizeof(AutovacWorkItems) +
sizeof(AutoVacuumWorkItem) * i;
workitem = (AutoVacuumWorkItem *) dsa_get_address(AutoVacuumDSA, wi_ptr);
workitem->avw_type = 0;
workitem->avw_database = InvalidOid;
workitem->avw_relation = InvalidOid;
workitem->avw_active = false;
/* put this item in the free list */
workitem->avw_next = workitems->avs_freeItems;
workitems->avs_freeItems = wi_ptr;
}
}
workitems = (AutovacWorkItems *)
dsa_get_address(AutoVacuumDSA, AutoVacuumShmem->av_workitems);
/* If array is full, disregard the request */
if (workitems->avs_freeItems == InvalidDsaPointer)
{
LWLockRelease(AutovacuumLock);
dsa_detach(AutoVacuumDSA);
AutoVacuumDSA = NULL;
return;
}
/* remove workitem struct from free list ... */
wi_ptr = workitems->avs_freeItems;
remove_wi_from_list(&workitems->avs_freeItems, wi_ptr);
/* ... initialize it ... */
workitem = dsa_get_address(AutoVacuumDSA, wi_ptr);
workitem->avw_type = type;
workitem->avw_database = MyDatabaseId;
workitem->avw_relation = relationId;
workitem->avw_blockNumber = blkno;
workitem->avw_active = false;
/* ... and put it on autovacuum's to-do list */
add_wi_to_list(&workitems->avs_usedItems, wi_ptr);
LWLockRelease(AutovacuumLock);
dsa_detach(AutoVacuumDSA);
AutoVacuumDSA = NULL;
}
/* /*
* autovac_init * autovac_init
* This is called at postmaster initialization. * This is called at postmaster initialization.
...@@ -3079,3 +3451,59 @@ autovac_refresh_stats(void) ...@@ -3079,3 +3451,59 @@ autovac_refresh_stats(void)
pgstat_clear_snapshot(); pgstat_clear_snapshot();
} }
/*
* Simplistic open-coded list implementation for objects stored in DSA.
* Each item is doubly linked, but we have no tail pointer, and the "prev"
* element of the first item is null, not the list.
*/
/*
* Remove a work item from the given list.
*/
static void
remove_wi_from_list(dsa_pointer *list, dsa_pointer wi_ptr)
{
AutoVacuumWorkItem *workitem = dsa_get_address(AutoVacuumDSA, wi_ptr);
dsa_pointer next = workitem->avw_next;
dsa_pointer prev = workitem->avw_prev;
workitem->avw_next = workitem->avw_prev = InvalidDsaPointer;
if (next != InvalidDsaPointer)
{
workitem = dsa_get_address(AutoVacuumDSA, next);
workitem->avw_prev = prev;
}
if (prev != InvalidDsaPointer)
{
workitem = dsa_get_address(AutoVacuumDSA, prev);
workitem->avw_next = next;
}
else
*list = next;
}
/*
* Add a workitem to the given list
*/
static void
add_wi_to_list(dsa_pointer *list, dsa_pointer wi_ptr)
{
if (*list == InvalidDsaPointer)
{
/* list is empty; item is now singleton */
*list = wi_ptr;
}
else
{
AutoVacuumWorkItem *workitem = dsa_get_address(AutoVacuumDSA, wi_ptr);
AutoVacuumWorkItem *old = dsa_get_address(AutoVacuumDSA, *list);
/* Put item at head of list */
workitem->avw_next = *list;
old->avw_prev = wi_ptr;
*list = wi_ptr;
}
}
...@@ -22,6 +22,7 @@ typedef struct BrinOptions ...@@ -22,6 +22,7 @@ typedef struct BrinOptions
{ {
int32 vl_len_; /* varlena header (do not touch directly!) */ int32 vl_len_; /* varlena header (do not touch directly!) */
BlockNumber pagesPerRange; BlockNumber pagesPerRange;
bool autosummarize;
} BrinOptions; } BrinOptions;
#define BRIN_DEFAULT_PAGES_PER_RANGE 128 #define BRIN_DEFAULT_PAGES_PER_RANGE 128
...@@ -29,5 +30,9 @@ typedef struct BrinOptions ...@@ -29,5 +30,9 @@ typedef struct BrinOptions
((relation)->rd_options ? \ ((relation)->rd_options ? \
((BrinOptions *) (relation)->rd_options)->pagesPerRange : \ ((BrinOptions *) (relation)->rd_options)->pagesPerRange : \
BRIN_DEFAULT_PAGES_PER_RANGE) BRIN_DEFAULT_PAGES_PER_RANGE)
#define BrinGetAutoSummarize(relation) \
((relation)->rd_options ? \
((BrinOptions *) (relation)->rd_options)->autosummarize : \
false)
#endif /* BRIN_H */ #endif /* BRIN_H */
...@@ -53,6 +53,6 @@ ...@@ -53,6 +53,6 @@
*/ */
/* yyyymmddN */ /* yyyymmddN */
#define CATALOG_VERSION_NO 201703312 #define CATALOG_VERSION_NO 201704011
#endif #endif
...@@ -564,6 +564,8 @@ DATA(insert OID = 335 ( brinhandler PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 ...@@ -564,6 +564,8 @@ DATA(insert OID = 335 ( brinhandler PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0
DESCR("brin index access method handler"); DESCR("brin index access method handler");
DATA(insert OID = 3952 ( brin_summarize_new_values PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 23 "2205" _null_ _null_ _null_ _null_ _null_ brin_summarize_new_values _null_ _null_ _null_ )); DATA(insert OID = 3952 ( brin_summarize_new_values PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 23 "2205" _null_ _null_ _null_ _null_ _null_ brin_summarize_new_values _null_ _null_ _null_ ));
DESCR("brin: standalone scan new table pages"); DESCR("brin: standalone scan new table pages");
DATA(insert OID = 3999 ( brin_summarize_range PGNSP PGUID 12 1 0 0 0 f f f f t f v s 2 0 23 "2205 20" _null_ _null_ _null_ _null_ _null_ brin_summarize_range _null_ _null_ _null_ ));
DESCR("brin: standalone scan new table pages");
DATA(insert OID = 338 ( amvalidate PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ amvalidate _null_ _null_ _null_ )); DATA(insert OID = 338 ( amvalidate PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ amvalidate _null_ _null_ _null_ ));
DESCR("validate an operator class"); DESCR("validate an operator class");
......
...@@ -14,6 +14,15 @@ ...@@ -14,6 +14,15 @@
#ifndef AUTOVACUUM_H #ifndef AUTOVACUUM_H
#define AUTOVACUUM_H #define AUTOVACUUM_H
/*
* Other processes can request specific work from autovacuum, identified by
* AutoVacuumWorkItem elements.
*/
typedef enum
{
AVW_BRINSummarizeRange
} AutoVacuumWorkItemType;
/* GUC variables */ /* GUC variables */
extern bool autovacuum_start_daemon; extern bool autovacuum_start_daemon;
...@@ -60,6 +69,9 @@ extern void AutovacuumWorkerIAm(void); ...@@ -60,6 +69,9 @@ extern void AutovacuumWorkerIAm(void);
extern void AutovacuumLauncherIAm(void); extern void AutovacuumLauncherIAm(void);
#endif #endif
extern void AutoVacuumRequestWork(AutoVacuumWorkItemType type,
Oid relationId, BlockNumber blkno);
/* shared memory stuff */ /* shared memory stuff */
extern Size AutoVacuumShmemSize(void); extern Size AutoVacuumShmemSize(void);
extern void AutoVacuumShmemInit(void); extern void AutoVacuumShmemInit(void);
......
...@@ -406,3 +406,51 @@ SELECT brin_summarize_new_values('brinidx'); -- ok, no change expected ...@@ -406,3 +406,51 @@ SELECT brin_summarize_new_values('brinidx'); -- ok, no change expected
0 0
(1 row) (1 row)
-- Test brin_summarize_range
CREATE TABLE brin_summarize (
value int
) WITH (fillfactor=10, autovacuum_enabled=false);
CREATE INDEX brin_summarize_idx ON brin_summarize USING brin (value) WITH (pages_per_range=2);
-- Fill a few pages
DO $$
DECLARE curtid tid;
BEGIN
LOOP
INSERT INTO brin_summarize VALUES (1) RETURNING ctid INTO curtid;
EXIT WHEN curtid > tid '(2, 0)';
END LOOP;
END;
$$;
-- summarize one range
SELECT brin_summarize_range('brin_summarize_idx', 0);
brin_summarize_range
----------------------
1
(1 row)
-- nothing: already summarized
SELECT brin_summarize_range('brin_summarize_idx', 1);
brin_summarize_range
----------------------
0
(1 row)
-- summarize one range
SELECT brin_summarize_range('brin_summarize_idx', 2);
brin_summarize_range
----------------------
1
(1 row)
-- nothing: page doesn't exist in table
SELECT brin_summarize_range('brin_summarize_idx', 4294967295);
brin_summarize_range
----------------------
0
(1 row)
-- invalid block number values
SELECT brin_summarize_range('brin_summarize_idx', -1);
ERROR: block number out of range: -1
SELECT brin_summarize_range('brin_summarize_idx', 4294967296);
ERROR: block number out of range: 4294967296
...@@ -409,3 +409,31 @@ UPDATE brintest SET textcol = '' WHERE textcol IS NOT NULL; ...@@ -409,3 +409,31 @@ UPDATE brintest SET textcol = '' WHERE textcol IS NOT NULL;
SELECT brin_summarize_new_values('brintest'); -- error, not an index SELECT brin_summarize_new_values('brintest'); -- error, not an index
SELECT brin_summarize_new_values('tenk1_unique1'); -- error, not a BRIN index SELECT brin_summarize_new_values('tenk1_unique1'); -- error, not a BRIN index
SELECT brin_summarize_new_values('brinidx'); -- ok, no change expected SELECT brin_summarize_new_values('brinidx'); -- ok, no change expected
-- Test brin_summarize_range
CREATE TABLE brin_summarize (
value int
) WITH (fillfactor=10, autovacuum_enabled=false);
CREATE INDEX brin_summarize_idx ON brin_summarize USING brin (value) WITH (pages_per_range=2);
-- Fill a few pages
DO $$
DECLARE curtid tid;
BEGIN
LOOP
INSERT INTO brin_summarize VALUES (1) RETURNING ctid INTO curtid;
EXIT WHEN curtid > tid '(2, 0)';
END LOOP;
END;
$$;
-- summarize one range
SELECT brin_summarize_range('brin_summarize_idx', 0);
-- nothing: already summarized
SELECT brin_summarize_range('brin_summarize_idx', 1);
-- summarize one range
SELECT brin_summarize_range('brin_summarize_idx', 2);
-- nothing: page doesn't exist in table
SELECT brin_summarize_range('brin_summarize_idx', 4294967295);
-- invalid block number values
SELECT brin_summarize_range('brin_summarize_idx', -1);
SELECT brin_summarize_range('brin_summarize_idx', 4294967296);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment