Commit e5253fdc authored by Robert Haas's avatar Robert Haas

Add parallel_leader_participation GUC.

Sometimes, for testing, it's useful to have the leader do nothing but
read tuples from workers; and it's possible that could work out better
even in production.

Thomas Munro, reviewed by Amit Kapila and by me.  A few final tweaks
by me.

Discussion: http://postgr.es/m/CAEepm=2U++Lp3bNTv2Bv_kkr5NE2pOyHhxU=G0YTa4ZhSYhHiw@mail.gmail.com
parent 75180499
......@@ -4265,6 +4265,32 @@ SELECT * FROM parent WHERE key = 2400;
</listitem>
</varlistentry>
<varlistentry id="guc-parallel-leader-participation" xreflabel="parallel_leader_participation">
<term>
<varname>parallel_leader_participation</varname> (<type>boolean</type>)
<indexterm>
<primary>
<varname>parallel_leader_participation</varname> configuration
parameter
</primary>
</indexterm>
</term>
<listitem>
<para>
Allows the leader process to execute the query plan under
<literal>Gather</literal> and <literal>Gather Merge</literal> nodes
instead of waiting for worker processes. The default is
<literal>on</literal>. Setting this value to <literal>off</literal>
reduces the likelihood that workers will become blocked because the
leader is not reading tuples fast enough, but requires the leader
process to wait for worker processes to start up before the first
tuples can be produced. The degree to which the leader can help or
hinder performance depends on the plan type, number of workers and
query duration.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-force-parallel-mode" xreflabel="force_parallel_mode">
<term><varname>force_parallel_mode</varname> (<type>enum</type>)
<indexterm>
......
......@@ -38,6 +38,7 @@
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
#include "miscadmin.h"
#include "optimizer/planmain.h"
#include "pgstat.h"
#include "utils/memutils.h"
#include "utils/rel.h"
......@@ -73,7 +74,8 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
gatherstate->ps.ExecProcNode = ExecGather;
gatherstate->initialized = false;
gatherstate->need_to_scan_locally = !node->single_copy;
gatherstate->need_to_scan_locally =
!node->single_copy && parallel_leader_participation;
gatherstate->tuples_needed = -1;
/*
......@@ -193,9 +195,9 @@ ExecGather(PlanState *pstate)
node->nextreader = 0;
}
/* Run plan locally if no workers or not single-copy. */
/* Run plan locally if no workers or enabled and not single-copy. */
node->need_to_scan_locally = (node->nreaders == 0)
|| !gather->single_copy;
|| (!gather->single_copy && parallel_leader_participation);
node->initialized = true;
}
......
......@@ -23,6 +23,7 @@
#include "executor/tqueue.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "optimizer/planmain.h"
#include "utils/memutils.h"
#include "utils/rel.h"
......@@ -233,7 +234,8 @@ ExecGatherMerge(PlanState *pstate)
}
}
/* always allow leader to participate */
/* allow leader to participate if enabled or no choice */
if (parallel_leader_participation || node->nreaders == 0)
node->need_to_scan_locally = true;
node->initialized = true;
}
......
......@@ -5137,7 +5137,6 @@ static double
get_parallel_divisor(Path *path)
{
double parallel_divisor = path->parallel_workers;
double leader_contribution;
/*
* Early experience with parallel query suggests that when there is only
......@@ -5150,9 +5149,14 @@ get_parallel_divisor(Path *path)
* its time servicing each worker, and the remainder executing the
* parallel plan.
*/
if (parallel_leader_participation)
{
double leader_contribution;
leader_contribution = 1.0 - (0.3 * path->parallel_workers);
if (leader_contribution > 0)
parallel_divisor += leader_contribution;
}
return parallel_divisor;
}
......
......@@ -61,6 +61,7 @@
/* GUC parameters */
double cursor_tuple_fraction = DEFAULT_CURSOR_TUPLE_FRACTION;
int force_parallel_mode = FORCE_PARALLEL_OFF;
bool parallel_leader_participation = true;
/* Hook for plugins to get control in planner() */
planner_hook_type planner_hook = NULL;
......
......@@ -1676,6 +1676,16 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
{
{"parallel_leader_participation", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
gettext_noop("Controls whether Gather and Gather Merge also run subplans."),
gettext_noop("Should gather nodes also run subplans, or just gather tuples?")
},
&parallel_leader_participation,
true,
NULL, NULL, NULL
},
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
......
......@@ -163,6 +163,7 @@
#effective_io_concurrency = 1 # 1-1000; 0 disables prefetching
#max_worker_processes = 8 # (change requires restart)
#max_parallel_workers_per_gather = 2 # taken from max_parallel_workers
#parallel_leader_particulation = on
#max_parallel_workers = 8 # maximum number of max_worker_processes that
# can be used in parallel queries
#old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate
......
......@@ -29,6 +29,7 @@ typedef enum
#define DEFAULT_CURSOR_TUPLE_FRACTION 0.1
extern double cursor_tuple_fraction;
extern int force_parallel_mode;
extern bool parallel_leader_participation;
/* query_planner callback to compute query_pathkeys */
typedef void (*query_pathkeys_callback) (PlannerInfo *root, void *extra);
......
......@@ -34,6 +34,49 @@ select count(*) from a_star;
50
(1 row)
-- test with leader participation disabled
set parallel_leader_participation = off;
explain (costs off)
select count(*) from tenk1 where stringu1 = 'GRAAAA';
QUERY PLAN
---------------------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 4
-> Partial Aggregate
-> Parallel Seq Scan on tenk1
Filter: (stringu1 = 'GRAAAA'::name)
(6 rows)
select count(*) from tenk1 where stringu1 = 'GRAAAA';
count
-------
15
(1 row)
-- test with leader participation disabled, but no workers available (so
-- the leader will have to run the plan despite the setting)
set max_parallel_workers = 0;
explain (costs off)
select count(*) from tenk1 where stringu1 = 'GRAAAA';
QUERY PLAN
---------------------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 4
-> Partial Aggregate
-> Parallel Seq Scan on tenk1
Filter: (stringu1 = 'GRAAAA'::name)
(6 rows)
select count(*) from tenk1 where stringu1 = 'GRAAAA';
count
-------
15
(1 row)
reset max_parallel_workers;
reset parallel_leader_participation;
-- test that parallel_restricted function doesn't run in worker
alter table tenk1 set (parallel_workers = 4);
explain (verbose, costs off)
......@@ -400,6 +443,49 @@ explain (costs off, verbose)
(11 rows)
drop function simple_func(integer);
-- test gather merge with parallel leader participation disabled
set parallel_leader_participation = off;
explain (costs off)
select count(*) from tenk1 group by twenty;
QUERY PLAN
----------------------------------------------------
Finalize GroupAggregate
Group Key: twenty
-> Gather Merge
Workers Planned: 4
-> Partial GroupAggregate
Group Key: twenty
-> Sort
Sort Key: twenty
-> Parallel Seq Scan on tenk1
(9 rows)
select count(*) from tenk1 group by twenty;
count
-------
500
500
500
500
500
500
500
500
500
500
500
500
500
500
500
500
500
500
500
500
(20 rows)
reset parallel_leader_participation;
--test rescan behavior of gather merge
set enable_material = false;
explain (costs off)
......@@ -508,6 +594,33 @@ select string4 from tenk1 order by string4 limit 5;
AAAAxx
(5 rows)
-- gather merge test with 0 workers, with parallel leader
-- participation disabled (the leader will have to run the plan
-- despite the setting)
set parallel_leader_participation = off;
explain (costs off)
select string4 from tenk1 order by string4 limit 5;
QUERY PLAN
----------------------------------------------
Limit
-> Gather Merge
Workers Planned: 4
-> Sort
Sort Key: string4
-> Parallel Seq Scan on tenk1
(6 rows)
select string4 from tenk1 order by string4 limit 5;
string4
---------
AAAAxx
AAAAxx
AAAAxx
AAAAxx
AAAAxx
(5 rows)
reset parallel_leader_participation;
reset max_parallel_workers;
SAVEPOINT settings;
SET LOCAL force_parallel_mode = 1;
......
......@@ -19,6 +19,22 @@ explain (costs off)
select count(*) from a_star;
select count(*) from a_star;
-- test with leader participation disabled
set parallel_leader_participation = off;
explain (costs off)
select count(*) from tenk1 where stringu1 = 'GRAAAA';
select count(*) from tenk1 where stringu1 = 'GRAAAA';
-- test with leader participation disabled, but no workers available (so
-- the leader will have to run the plan despite the setting)
set max_parallel_workers = 0;
explain (costs off)
select count(*) from tenk1 where stringu1 = 'GRAAAA';
select count(*) from tenk1 where stringu1 = 'GRAAAA';
reset max_parallel_workers;
reset parallel_leader_participation;
-- test that parallel_restricted function doesn't run in worker
alter table tenk1 set (parallel_workers = 4);
explain (verbose, costs off)
......@@ -157,6 +173,16 @@ explain (costs off, verbose)
drop function simple_func(integer);
-- test gather merge with parallel leader participation disabled
set parallel_leader_participation = off;
explain (costs off)
select count(*) from tenk1 group by twenty;
select count(*) from tenk1 group by twenty;
reset parallel_leader_participation;
--test rescan behavior of gather merge
set enable_material = false;
......@@ -192,6 +218,16 @@ set max_parallel_workers = 0;
explain (costs off)
select string4 from tenk1 order by string4 limit 5;
select string4 from tenk1 order by string4 limit 5;
-- gather merge test with 0 workers, with parallel leader
-- participation disabled (the leader will have to run the plan
-- despite the setting)
set parallel_leader_participation = off;
explain (costs off)
select string4 from tenk1 order by string4 limit 5;
select string4 from tenk1 order by string4 limit 5;
reset parallel_leader_participation;
reset max_parallel_workers;
SAVEPOINT settings;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment