Commit 8526bcb2 authored by Robert Haas's avatar Robert Haas

Try again to fix accumulation of parallel worker instrumentation.

When a Gather or Gather Merge node is started and stopped multiple
times, accumulate instrumentation data only once, at the end, instead
of after each execution, to avoid recording inflated totals.

Commit 778e78ae9fa51e58f41cbdc72b293291d02d8984, the previous attempt
at a fix, instead reset the state after every execution, which worked
for the general instrumentation data but had problems for the additional
instrumentation specific to Sort and Hash nodes.

Report by hubert depesz lubaczewski.  Analysis and fix by Amit Kapila,
following a design proposal from Thomas Munro, with a comment tweak
by me.

Discussion: http://postgr.es/m/20171127175631.GA405@depesz.com
parent 38fc5470
...@@ -899,12 +899,8 @@ ExecParallelReInitializeDSM(PlanState *planstate, ...@@ -899,12 +899,8 @@ ExecParallelReInitializeDSM(PlanState *planstate,
pcxt); pcxt);
break; break;
case T_HashState: case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashReInitializeDSM((HashState *) planstate, pcxt);
break;
case T_SortState: case T_SortState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */ /* these nodes have DSM state, but no reinitialization is required */
ExecSortReInitializeDSM((SortState *) planstate, pcxt);
break; break;
default: default:
...@@ -977,7 +973,7 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, ...@@ -977,7 +973,7 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
/* /*
* Finish parallel execution. We wait for parallel workers to finish, and * Finish parallel execution. We wait for parallel workers to finish, and
* accumulate their buffer usage and instrumentation. * accumulate their buffer usage.
*/ */
void void
ExecParallelFinish(ParallelExecutorInfo *pei) ExecParallelFinish(ParallelExecutorInfo *pei)
...@@ -1023,23 +1019,23 @@ ExecParallelFinish(ParallelExecutorInfo *pei) ...@@ -1023,23 +1019,23 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
for (i = 0; i < nworkers; i++) for (i = 0; i < nworkers; i++)
InstrAccumParallelQuery(&pei->buffer_usage[i]); InstrAccumParallelQuery(&pei->buffer_usage[i]);
/* Finally, accumulate instrumentation, if any. */
if (pei->instrumentation)
ExecParallelRetrieveInstrumentation(pei->planstate,
pei->instrumentation);
pei->finished = true; pei->finished = true;
} }
/* /*
* Clean up whatever ParallelExecutorInfo resources still exist after * Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
* ExecParallelFinish. We separate these routines because someone might * resources still exist after ExecParallelFinish. We separate these
* want to examine the contents of the DSM after ExecParallelFinish and * routines because someone might want to examine the contents of the DSM
* before calling this routine. * after ExecParallelFinish and before calling this routine.
*/ */
void void
ExecParallelCleanup(ParallelExecutorInfo *pei) ExecParallelCleanup(ParallelExecutorInfo *pei)
{ {
/* Accumulate instrumentation, if any. */
if (pei->instrumentation)
ExecParallelRetrieveInstrumentation(pei->planstate,
pei->instrumentation);
/* Free any serialized parameters. */ /* Free any serialized parameters. */
if (DsaPointerIsValid(pei->param_exec)) if (DsaPointerIsValid(pei->param_exec))
{ {
......
...@@ -1669,19 +1669,6 @@ ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt) ...@@ -1669,19 +1669,6 @@ ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
node->shared_info); node->shared_info);
} }
/*
* Reset shared state before beginning a fresh scan.
*/
void
ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt)
{
if (node->shared_info != NULL)
{
memset(node->shared_info->hinstrument, 0,
node->shared_info->num_workers * sizeof(HashInstrumentation));
}
}
/* /*
* Locate the DSM space for hash table instrumentation data that we'll write * Locate the DSM space for hash table instrumentation data that we'll write
* to at shutdown time. * to at shutdown time.
......
...@@ -396,23 +396,6 @@ ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt) ...@@ -396,23 +396,6 @@ ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt)
node->shared_info); node->shared_info);
} }
/* ----------------------------------------------------------------
* ExecSortReInitializeDSM
*
* Reset shared state before beginning a fresh scan.
* ----------------------------------------------------------------
*/
void
ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt)
{
/* If there's any instrumentation space, clear it for next time */
if (node->shared_info != NULL)
{
memset(node->shared_info->sinstrument, 0,
node->shared_info->num_workers * sizeof(TuplesortInstrumentation));
}
}
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecSortInitializeWorker * ExecSortInitializeWorker
* *
......
...@@ -52,7 +52,6 @@ extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue); ...@@ -52,7 +52,6 @@ extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt); extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt); extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt);
extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt); extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt);
extern void ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt);
extern void ExecHashRetrieveInstrumentation(HashState *node); extern void ExecHashRetrieveInstrumentation(HashState *node);
extern void ExecShutdownHash(HashState *node); extern void ExecShutdownHash(HashState *node);
extern void ExecHashGetInstrumentation(HashInstrumentation *instrument, extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
......
...@@ -26,7 +26,6 @@ extern void ExecReScanSort(SortState *node); ...@@ -26,7 +26,6 @@ extern void ExecReScanSort(SortState *node);
/* parallel instrumentation support */ /* parallel instrumentation support */
extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt); extern void ExecSortEstimate(SortState *node, ParallelContext *pcxt);
extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt); extern void ExecSortInitializeDSM(SortState *node, ParallelContext *pcxt);
extern void ExecSortReInitializeDSM(SortState *node, ParallelContext *pcxt);
extern void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt); extern void ExecSortInitializeWorker(SortState *node, ParallelWorkerContext *pwcxt);
extern void ExecSortRetrieveInstrumentation(SortState *node); extern void ExecSortRetrieveInstrumentation(SortState *node);
......
...@@ -465,14 +465,71 @@ select count(*) from bmscantest where a>1; ...@@ -465,14 +465,71 @@ select count(*) from bmscantest where a>1;
99999 99999
(1 row) (1 row)
-- test accumulation of stats for parallel nodes
reset enable_seqscan; reset enable_seqscan;
alter table tenk2 set (parallel_workers = 0);
explain (analyze, timing off, summary off, costs off)
select count(*) from tenk1, tenk2 where tenk1.hundred > 1
and tenk2.thousand=0;
QUERY PLAN
--------------------------------------------------------------------------
Aggregate (actual rows=1 loops=1)
-> Nested Loop (actual rows=98000 loops=1)
-> Seq Scan on tenk2 (actual rows=10 loops=1)
Filter: (thousand = 0)
Rows Removed by Filter: 9990
-> Gather (actual rows=9800 loops=10)
Workers Planned: 4
Workers Launched: 4
-> Parallel Seq Scan on tenk1 (actual rows=1960 loops=50)
Filter: (hundred > 1)
Rows Removed by Filter: 40
(11 rows)
alter table tenk2 reset (parallel_workers);
reset work_mem;
create function explain_parallel_sort_stats() returns setof text
language plpgsql as
$$
declare ln text;
begin
for ln in
explain (analyze, timing off, summary off, costs off)
select * from
(select ten from tenk1 where ten < 100 order by ten) ss
right join (values (1),(2),(3)) v(x) on true
loop
ln := regexp_replace(ln, 'Memory: \S*', 'Memory: xxx');
return next ln;
end loop;
end;
$$;
select * from explain_parallel_sort_stats();
explain_parallel_sort_stats
--------------------------------------------------------------------------
Nested Loop Left Join (actual rows=30000 loops=1)
-> Values Scan on "*VALUES*" (actual rows=3 loops=1)
-> Gather Merge (actual rows=10000 loops=3)
Workers Planned: 4
Workers Launched: 4
-> Sort (actual rows=2000 loops=15)
Sort Key: tenk1.ten
Sort Method: quicksort Memory: xxx
Worker 0: Sort Method: quicksort Memory: xxx
Worker 1: Sort Method: quicksort Memory: xxx
Worker 2: Sort Method: quicksort Memory: xxx
Worker 3: Sort Method: quicksort Memory: xxx
-> Parallel Seq Scan on tenk1 (actual rows=2000 loops=15)
Filter: (ten < 100)
(14 rows)
reset enable_indexscan; reset enable_indexscan;
reset enable_hashjoin; reset enable_hashjoin;
reset enable_mergejoin; reset enable_mergejoin;
reset enable_material; reset enable_material;
reset effective_io_concurrency; reset effective_io_concurrency;
reset work_mem;
drop table bmscantest; drop table bmscantest;
drop function explain_parallel_sort_stats();
-- test parallel merge join path. -- test parallel merge join path.
set enable_hashjoin to off; set enable_hashjoin to off;
set enable_nestloop to off; set enable_nestloop to off;
......
...@@ -179,14 +179,40 @@ insert into bmscantest select r, 'fooooooooooooooooooooooooooooooooooooooooooooo ...@@ -179,14 +179,40 @@ insert into bmscantest select r, 'fooooooooooooooooooooooooooooooooooooooooooooo
create index i_bmtest ON bmscantest(a); create index i_bmtest ON bmscantest(a);
select count(*) from bmscantest where a>1; select count(*) from bmscantest where a>1;
-- test accumulation of stats for parallel nodes
reset enable_seqscan; reset enable_seqscan;
alter table tenk2 set (parallel_workers = 0);
explain (analyze, timing off, summary off, costs off)
select count(*) from tenk1, tenk2 where tenk1.hundred > 1
and tenk2.thousand=0;
alter table tenk2 reset (parallel_workers);
reset work_mem;
create function explain_parallel_sort_stats() returns setof text
language plpgsql as
$$
declare ln text;
begin
for ln in
explain (analyze, timing off, summary off, costs off)
select * from
(select ten from tenk1 where ten < 100 order by ten) ss
right join (values (1),(2),(3)) v(x) on true
loop
ln := regexp_replace(ln, 'Memory: \S*', 'Memory: xxx');
return next ln;
end loop;
end;
$$;
select * from explain_parallel_sort_stats();
reset enable_indexscan; reset enable_indexscan;
reset enable_hashjoin; reset enable_hashjoin;
reset enable_mergejoin; reset enable_mergejoin;
reset enable_material; reset enable_material;
reset effective_io_concurrency; reset effective_io_concurrency;
reset work_mem;
drop table bmscantest; drop table bmscantest;
drop function explain_parallel_sort_stats();
-- test parallel merge join path. -- test parallel merge join path.
set enable_hashjoin to off; set enable_hashjoin to off;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment