Commit 3a5e2213 authored by Amit Kapila's avatar Amit Kapila

Allow parallel vacuum to accumulate buffer usage.

Commit 40d964ec allowed vacuum command to process indexes in parallel but
forgot to accumulate the buffer usage stats of parallel workers.  This
allows leader backend to accumulate buffer usage stats of all the parallel
workers.

Reported-by: Julien Rouhaud
Author: Sawada Masahiko
Reviewed-by: Dilip Kumar, Amit Kapila and Julien Rouhaud
Discussion: https://postgr.es/m/20200328151721.GB12854@nol
parent 17e03282
...@@ -65,6 +65,7 @@ ...@@ -65,6 +65,7 @@
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "commands/progress.h" #include "commands/progress.h"
#include "commands/vacuum.h" #include "commands/vacuum.h"
#include "executor/instrument.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "optimizer/paths.h" #include "optimizer/paths.h"
#include "pgstat.h" #include "pgstat.h"
...@@ -137,6 +138,7 @@ ...@@ -137,6 +138,7 @@
#define PARALLEL_VACUUM_KEY_SHARED 1 #define PARALLEL_VACUUM_KEY_SHARED 1
#define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2 #define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2
#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
/* /*
* Macro to check if we are in a parallel vacuum. If true, we are in the * Macro to check if we are in a parallel vacuum. If true, we are in the
...@@ -270,6 +272,9 @@ typedef struct LVParallelState ...@@ -270,6 +272,9 @@ typedef struct LVParallelState
/* Shared information among parallel vacuum workers */ /* Shared information among parallel vacuum workers */
LVShared *lvshared; LVShared *lvshared;
/* Points to buffer usage area in DSM */
BufferUsage *buffer_usage;
/* /*
* The number of indexes that support parallel index bulk-deletion and * The number of indexes that support parallel index bulk-deletion and
* parallel index cleanup respectively. * parallel index cleanup respectively.
...@@ -2137,9 +2142,21 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats, ...@@ -2137,9 +2142,21 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats,
parallel_vacuum_index(Irel, stats, lps->lvshared, parallel_vacuum_index(Irel, stats, lps->lvshared,
vacrelstats->dead_tuples, nindexes, vacrelstats); vacrelstats->dead_tuples, nindexes, vacrelstats);
/*
* Next, accumulate buffer usage. (This must wait for the workers to
* finish, or we might get incomplete data.)
*/
if (nworkers > 0)
{
int i;
/* Wait for all vacuum workers to finish */ /* Wait for all vacuum workers to finish */
WaitForParallelWorkersToFinish(lps->pcxt); WaitForParallelWorkersToFinish(lps->pcxt);
for (i = 0; i < lps->pcxt->nworkers_launched; i++)
InstrAccumParallelQuery(&lps->buffer_usage[i]);
}
/* /*
* Carry the shared balance value to heap scan and disable shared costing * Carry the shared balance value to heap scan and disable shared costing
*/ */
...@@ -3153,6 +3170,7 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats, ...@@ -3153,6 +3170,7 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
ParallelContext *pcxt; ParallelContext *pcxt;
LVShared *shared; LVShared *shared;
LVDeadTuples *dead_tuples; LVDeadTuples *dead_tuples;
BufferUsage *buffer_usage;
bool *can_parallel_vacuum; bool *can_parallel_vacuum;
long maxtuples; long maxtuples;
char *sharedquery; char *sharedquery;
...@@ -3236,6 +3254,17 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats, ...@@ -3236,6 +3254,17 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples); shm_toc_estimate_chunk(&pcxt->estimator, est_deadtuples);
shm_toc_estimate_keys(&pcxt->estimator, 1); shm_toc_estimate_keys(&pcxt->estimator, 1);
/*
* Estimate space for BufferUsage -- PARALLEL_VACUUM_KEY_BUFFER_USAGE.
*
* If there are no extensions loaded that care, we could skip this. We
* have no way of knowing whether anyone's looking at pgBufferUsage, so do
* it unconditionally.
*/
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
querylen = strlen(debug_query_string); querylen = strlen(debug_query_string);
shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
...@@ -3270,6 +3299,12 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats, ...@@ -3270,6 +3299,12 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats,
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples); shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
vacrelstats->dead_tuples = dead_tuples; vacrelstats->dead_tuples = dead_tuples;
/* Allocate space for each worker's BufferUsage; no need to initialize */
buffer_usage = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
lps->buffer_usage = buffer_usage;
/* Store query string for workers */ /* Store query string for workers */
sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
memcpy(sharedquery, debug_query_string, querylen + 1); memcpy(sharedquery, debug_query_string, querylen + 1);
...@@ -3399,6 +3434,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) ...@@ -3399,6 +3434,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
Relation *indrels; Relation *indrels;
LVShared *lvshared; LVShared *lvshared;
LVDeadTuples *dead_tuples; LVDeadTuples *dead_tuples;
BufferUsage *buffer_usage;
int nindexes; int nindexes;
char *sharedquery; char *sharedquery;
IndexBulkDeleteResult **stats; IndexBulkDeleteResult **stats;
...@@ -3468,10 +3504,17 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) ...@@ -3468,10 +3504,17 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc)
errcallback.previous = error_context_stack; errcallback.previous = error_context_stack;
error_context_stack = &errcallback; error_context_stack = &errcallback;
/* Prepare to track buffer usage during parallel execution */
InstrStartParallelQuery();
/* Process indexes to perform vacuum/cleanup */ /* Process indexes to perform vacuum/cleanup */
parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes, parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes,
&vacrelstats); &vacrelstats);
/* Report buffer usage during parallel execution */
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
/* Pop the error context stack */ /* Pop the error context stack */
error_context_stack = errcallback.previous; error_context_stack = errcallback.previous;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment