Commit 3452dc52 authored by Robert Haas's avatar Robert Haas

Push tuple limits through Gather and Gather Merge.

If we only need, say, 10 tuples in total, then we certainly don't need
more than 10 tuples from any single process.  Pushing down the limit
lets workers exit early when possible.  For Gather Merge, there is
an additional benefit: a Sort immediately below the Gather Merge can
be done as a bounded sort if there is an applicable limit.

Robert Haas and Tom Lane

Discussion: http://postgr.es/m/CA+TgmoYa3QKKrLj5rX7UvGqhH73G1Li4B-EKxrmASaca2tFu9Q@mail.gmail.com
parent ce5dcf54
...@@ -47,16 +47,25 @@ ...@@ -47,16 +47,25 @@
* greater than any 32-bit integer here so that values < 2^32 can be used * greater than any 32-bit integer here so that values < 2^32 can be used
* by individual parallel nodes to store their own state. * by individual parallel nodes to store their own state.
*/ */
#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001) #define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002) #define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003) #define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003)
#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004) #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005) #define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006) #define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007) #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536 #define PARALLEL_TUPLE_QUEUE_SIZE 65536
/*
* Fixed-size random stuff that we need to pass to parallel workers.
*/
typedef struct FixedParallelExecutorState
{
int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
} FixedParallelExecutorState;
/* /*
* DSM structure for accumulating per-PlanState instrumentation. * DSM structure for accumulating per-PlanState instrumentation.
* *
...@@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei) ...@@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei)
* execution and return results to the main backend. * execution and return results to the main backend.
*/ */
ParallelExecutorInfo * ParallelExecutorInfo *
ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
int64 tuples_needed)
{ {
ParallelExecutorInfo *pei; ParallelExecutorInfo *pei;
ParallelContext *pcxt; ParallelContext *pcxt;
ExecParallelEstimateContext e; ExecParallelEstimateContext e;
ExecParallelInitializeDSMContext d; ExecParallelInitializeDSMContext d;
FixedParallelExecutorState *fpes;
char *pstmt_data; char *pstmt_data;
char *pstmt_space; char *pstmt_space;
char *param_space; char *param_space;
...@@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) ...@@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* for the various things we need to store. * for the various things we need to store.
*/ */
/* Estimate space for fixed-size state. */
shm_toc_estimate_chunk(&pcxt->estimator,
sizeof(FixedParallelExecutorState));
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate space for query text. */ /* Estimate space for query text. */
query_len = strlen(estate->es_sourceText); query_len = strlen(estate->es_sourceText);
shm_toc_estimate_chunk(&pcxt->estimator, query_len); shm_toc_estimate_chunk(&pcxt->estimator, query_len);
...@@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) ...@@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* asked for has been allocated or initialized yet, though, so do that. * asked for has been allocated or initialized yet, though, so do that.
*/ */
/* Store fixed-size state. */
fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
fpes->tuples_needed = tuples_needed;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
/* Store query string */ /* Store query string */
query_string = shm_toc_allocate(pcxt->toc, query_len); query_string = shm_toc_allocate(pcxt->toc, query_len);
memcpy(query_string, estate->es_sourceText, query_len); memcpy(query_string, estate->es_sourceText, query_len);
...@@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) ...@@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
void void
ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{ {
FixedParallelExecutorState *fpes;
BufferUsage *buffer_usage; BufferUsage *buffer_usage;
DestReceiver *receiver; DestReceiver *receiver;
QueryDesc *queryDesc; QueryDesc *queryDesc;
...@@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ...@@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
void *area_space; void *area_space;
dsa_area *area; dsa_area *area;
/* Get fixed-size state. */
fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */ /* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
receiver = ExecParallelGetReceiver(seg, toc); receiver = ExecParallelGetReceiver(seg, toc);
instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true); instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
...@@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ...@@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
queryDesc->planstate->state->es_query_dsa = area; queryDesc->planstate->state->es_query_dsa = area;
ExecParallelInitializeWorker(queryDesc->planstate, toc); ExecParallelInitializeWorker(queryDesc->planstate, toc);
/* Run the plan */ /* Pass down any tuple bound */
ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
/*
* Run the plan. If we specified a tuple bound, be careful not to demand
* more tuples than that.
*/
ExecutorRun(queryDesc,
ForwardScanDirection,
fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
true);
/* Shut down the executor */ /* Shut down the executor */
ExecutorFinish(queryDesc); ExecutorFinish(queryDesc);
......
...@@ -757,3 +757,124 @@ ExecShutdownNode(PlanState *node) ...@@ -757,3 +757,124 @@ ExecShutdownNode(PlanState *node)
return false; return false;
} }
/*
* ExecSetTupleBound
*
* Set a tuple bound for a planstate node. This lets child plan nodes
* optimize based on the knowledge that the maximum number of tuples that
* their parent will demand is limited. The tuple bound for a node may
* only be changed between scans (i.e., after node initialization or just
* before an ExecReScan call).
*
* Any negative tuples_needed value means "no limit", which should be the
* default assumption when this is not called at all for a particular node.
*
* Note: if this is called repeatedly on a plan tree, the exact same set
* of nodes must be updated with the new limit each time; be careful that
* only unchanging conditions are tested here.
*/
void
ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
{
/*
* Since this function recurses, in principle we should check stack depth
* here. In practice, it's probably pointless since the earlier node
* initialization tree traversal would surely have consumed more stack.
*/
if (IsA(child_node, SortState))
{
/*
* If it is a Sort node, notify it that it can use bounded sort.
*
* Note: it is the responsibility of nodeSort.c to react properly to
* changes of these parameters. If we ever redesign this, it'd be a
* good idea to integrate this signaling with the parameter-change
* mechanism.
*/
SortState *sortState = (SortState *) child_node;
if (tuples_needed < 0)
{
/* make sure flag gets reset if needed upon rescan */
sortState->bounded = false;
}
else
{
sortState->bounded = true;
sortState->bound = tuples_needed;
}
}
else if (IsA(child_node, MergeAppendState))
{
/*
* If it is a MergeAppend, we can apply the bound to any nodes that
* are children of the MergeAppend, since the MergeAppend surely need
* read no more than that many tuples from any one input.
*/
MergeAppendState *maState = (MergeAppendState *) child_node;
int i;
for (i = 0; i < maState->ms_nplans; i++)
ExecSetTupleBound(tuples_needed, maState->mergeplans[i]);
}
else if (IsA(child_node, ResultState))
{
/*
* Similarly, for a projecting Result, we can apply the bound to its
* child node.
*
* If Result supported qual checking, we'd have to punt on seeing a
* qual. Note that having a resconstantqual is not a showstopper: if
* that condition succeeds it affects nothing, while if it fails, no
* rows will be demanded from the Result child anyway.
*/
if (outerPlanState(child_node))
ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
}
else if (IsA(child_node, SubqueryScanState))
{
/*
* We can also descend through SubqueryScan, but only if it has no
* qual (otherwise it might discard rows).
*/
SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
if (subqueryState->ss.ps.qual == NULL)
ExecSetTupleBound(tuples_needed, subqueryState->subplan);
}
else if (IsA(child_node, GatherState))
{
/*
* A Gather node can propagate the bound to its workers. As with
* MergeAppend, no one worker could possibly need to return more
* tuples than the Gather itself needs to.
*
* Note: As with Sort, the Gather node is responsible for reacting
* properly to changes to this parameter.
*/
GatherState *gstate = (GatherState *) child_node;
gstate->tuples_needed = tuples_needed;
/* Also pass down the bound to our own copy of the child plan */
ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
}
else if (IsA(child_node, GatherMergeState))
{
/* Same comments as for Gather */
GatherMergeState *gstate = (GatherMergeState *) child_node;
gstate->tuples_needed = tuples_needed;
ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
}
/*
* In principle we could descend through any plan node type that is
* certain not to discard or combine input rows; but on seeing a node that
* can do that, we can't propagate the bound any further. For the moment
* it's unclear that any other cases are worth checking here.
*/
}
...@@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) ...@@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
gatherstate->ps.state = estate; gatherstate->ps.state = estate;
gatherstate->ps.ExecProcNode = ExecGather; gatherstate->ps.ExecProcNode = ExecGather;
gatherstate->need_to_scan_locally = !node->single_copy; gatherstate->need_to_scan_locally = !node->single_copy;
gatherstate->tuples_needed = -1;
/* /*
* Miscellaneous initialization * Miscellaneous initialization
...@@ -156,7 +157,8 @@ ExecGather(PlanState *pstate) ...@@ -156,7 +157,8 @@ ExecGather(PlanState *pstate)
if (!node->pei) if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree, node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate, estate,
gather->num_workers); gather->num_workers,
node->tuples_needed);
/* /*
* Register backend workers. We might not get as many as we * Register backend workers. We might not get as many as we
......
...@@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) ...@@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
gm_state->ps.plan = (Plan *) node; gm_state->ps.plan = (Plan *) node;
gm_state->ps.state = estate; gm_state->ps.state = estate;
gm_state->ps.ExecProcNode = ExecGatherMerge; gm_state->ps.ExecProcNode = ExecGatherMerge;
gm_state->tuples_needed = -1;
/* /*
* Miscellaneous initialization * Miscellaneous initialization
...@@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate) ...@@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate)
if (!node->pei) if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree, node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate, estate,
gm->num_workers); gm->num_workers,
node->tuples_needed);
/* Try to launch workers. */ /* Try to launch workers. */
pcxt = node->pei->pcxt; pcxt = node->pei->pcxt;
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
static void recompute_limits(LimitState *node); static void recompute_limits(LimitState *node);
static void pass_down_bound(LimitState *node, PlanState *child_node); static int64 compute_tuples_needed(LimitState *node);
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
...@@ -297,92 +297,26 @@ recompute_limits(LimitState *node) ...@@ -297,92 +297,26 @@ recompute_limits(LimitState *node)
/* Set state-machine state */ /* Set state-machine state */
node->lstate = LIMIT_RESCAN; node->lstate = LIMIT_RESCAN;
/* Notify child node about limit, if useful */ /*
pass_down_bound(node, outerPlanState(node)); * Notify child node about limit. Note: think not to "optimize" by
* skipping ExecSetTupleBound if compute_tuples_needed returns < 0. We
* must update the child node anyway, in case this is a rescan and the
* previous time we got a different result.
*/
ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node));
} }
/* /*
* If we have a COUNT, and our input is a Sort node, notify it that it can * Compute the maximum number of tuples needed to satisfy this Limit node.
* use bounded sort. We can also pass down the bound through plan nodes * Return a negative value if there is not a determinable limit.
* that cannot remove or combine input rows; for example, if our input is a
* MergeAppend, we can apply the same bound to any Sorts that are direct
* children of the MergeAppend, since the MergeAppend surely need not read
* more than that many tuples from any one input.
*
* This is a bit of a kluge, but we don't have any more-abstract way of
* communicating between the two nodes; and it doesn't seem worth trying
* to invent one without some more examples of special communication needs.
*
* Note: it is the responsibility of nodeSort.c to react properly to
* changes of these parameters. If we ever do redesign this, it'd be a
* good idea to integrate this signaling with the parameter-change mechanism.
*/ */
static void static int64
pass_down_bound(LimitState *node, PlanState *child_node) compute_tuples_needed(LimitState *node)
{ {
/* if (node->noCount)
* Since this function recurses, in principle we should check stack depth return -1;
* here. In practice, it's probably pointless since the earlier node /* Note: if this overflows, we'll return a negative value, which is OK */
* initialization tree traversal would surely have consumed more stack. return node->count + node->offset;
*/
if (IsA(child_node, SortState))
{
SortState *sortState = (SortState *) child_node;
int64 tuples_needed = node->count + node->offset;
/* negative test checks for overflow in sum */
if (node->noCount || tuples_needed < 0)
{
/* make sure flag gets reset if needed upon rescan */
sortState->bounded = false;
}
else
{
sortState->bounded = true;
sortState->bound = tuples_needed;
}
}
else if (IsA(child_node, MergeAppendState))
{
/* Pass down the bound through MergeAppend */
MergeAppendState *maState = (MergeAppendState *) child_node;
int i;
for (i = 0; i < maState->ms_nplans; i++)
pass_down_bound(node, maState->mergeplans[i]);
}
else if (IsA(child_node, ResultState))
{
/*
* We also have to be prepared to look through a Result, since the
* planner might stick one atop MergeAppend for projection purposes.
*
* If Result supported qual checking, we'd have to punt on seeing a
* qual. Note that having a resconstantqual is not a showstopper: if
* that fails we're not getting any rows at all.
*/
if (outerPlanState(child_node))
pass_down_bound(node, outerPlanState(child_node));
}
else if (IsA(child_node, SubqueryScanState))
{
/*
* We can also look through SubqueryScan, but only if it has no qual
* (otherwise it might discard rows).
*/
SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
if (subqueryState->ss.ps.qual == NULL)
pass_down_bound(node, subqueryState->subplan);
}
/*
* In principle we could look through any plan node type that is certain
* not to discard or combine input rows. In practice, there are not many
* node types that the planner might put between Sort and Limit, so trying
* to be very general is not worth the trouble.
*/
} }
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
......
...@@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo ...@@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo
} ParallelExecutorInfo; } ParallelExecutorInfo;
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers); EState *estate, int nworkers, int64 tuples_needed);
extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
extern void ExecParallelReinitialize(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
......
...@@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); ...@@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
extern Node *MultiExecProcNode(PlanState *node); extern Node *MultiExecProcNode(PlanState *node);
extern void ExecEndNode(PlanState *node); extern void ExecEndNode(PlanState *node);
extern bool ExecShutdownNode(PlanState *node); extern bool ExecShutdownNode(PlanState *node);
extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
......
...@@ -1919,6 +1919,7 @@ typedef struct GatherState ...@@ -1919,6 +1919,7 @@ typedef struct GatherState
struct TupleQueueReader **reader; struct TupleQueueReader **reader;
TupleTableSlot *funnel_slot; TupleTableSlot *funnel_slot;
bool need_to_scan_locally; bool need_to_scan_locally;
int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
} GatherState; } GatherState;
/* ---------------- /* ----------------
...@@ -1944,6 +1945,7 @@ typedef struct GatherMergeState ...@@ -1944,6 +1945,7 @@ typedef struct GatherMergeState
struct binaryheap *gm_heap; /* binary heap of slot indices */ struct binaryheap *gm_heap; /* binary heap of slot indices */
bool gm_initialized; /* gather merge initilized ? */ bool gm_initialized; /* gather merge initilized ? */
bool need_to_scan_locally; bool need_to_scan_locally;
int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
int gm_nkeys; int gm_nkeys;
SortSupport gm_sortkeys; /* array of length ms_nkeys */ SortSupport gm_sortkeys; /* array of length ms_nkeys */
struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per reader */ struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per reader */
......
...@@ -300,6 +300,29 @@ select count(*) from tenk1 group by twenty; ...@@ -300,6 +300,29 @@ select count(*) from tenk1 group by twenty;
500 500
(20 rows) (20 rows)
reset enable_hashagg;
-- gather merge test with a LIMIT
explain (costs off)
select fivethous from tenk1 order by fivethous limit 4;
QUERY PLAN
----------------------------------------------
Limit
-> Gather Merge
Workers Planned: 4
-> Sort
Sort Key: fivethous
-> Parallel Seq Scan on tenk1
(6 rows)
select fivethous from tenk1 order by fivethous limit 4;
fivethous
-----------
0
0
1
1
(4 rows)
-- gather merge test with 0 worker -- gather merge test with 0 worker
set max_parallel_workers = 0; set max_parallel_workers = 0;
explain (costs off) explain (costs off)
...@@ -325,7 +348,6 @@ select string4 from tenk1 order by string4 limit 5; ...@@ -325,7 +348,6 @@ select string4 from tenk1 order by string4 limit 5;
(5 rows) (5 rows)
reset max_parallel_workers; reset max_parallel_workers;
reset enable_hashagg;
SAVEPOINT settings; SAVEPOINT settings;
SET LOCAL force_parallel_mode = 1; SET LOCAL force_parallel_mode = 1;
explain (costs off) explain (costs off)
......
...@@ -118,13 +118,20 @@ explain (costs off) ...@@ -118,13 +118,20 @@ explain (costs off)
select count(*) from tenk1 group by twenty; select count(*) from tenk1 group by twenty;
reset enable_hashagg;
-- gather merge test with a LIMIT
explain (costs off)
select fivethous from tenk1 order by fivethous limit 4;
select fivethous from tenk1 order by fivethous limit 4;
-- gather merge test with 0 worker -- gather merge test with 0 worker
set max_parallel_workers = 0; set max_parallel_workers = 0;
explain (costs off) explain (costs off)
select string4 from tenk1 order by string4 limit 5; select string4 from tenk1 order by string4 limit 5;
select string4 from tenk1 order by string4 limit 5; select string4 from tenk1 order by string4 limit 5;
reset max_parallel_workers; reset max_parallel_workers;
reset enable_hashagg;
SAVEPOINT settings; SAVEPOINT settings;
SET LOCAL force_parallel_mode = 1; SET LOCAL force_parallel_mode = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment