Commit 8538a630 authored by Robert Haas's avatar Robert Haas

Make Gather node projection-capable.

The original Gather code failed to mark a Gather node as not able to
do projection, but it couldn't, even though it did call initialize its
projection info via ExecAssignProjectionInfo.  There doesn't seem to
be any good reason for this node not to have projection capability,
so clean things up so that it does.  Without this, plans using Gather
nodes might need to carry extra Result nodes to do projection.
parent c15898c1
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#include "executor/nodeGather.h" #include "executor/nodeGather.h"
#include "executor/nodeSubplan.h" #include "executor/nodeSubplan.h"
#include "executor/tqueue.h" #include "executor/tqueue.h"
#include "utils/memutils.h"
#include "utils/rel.h" #include "utils/rel.h"
...@@ -50,6 +51,9 @@ GatherState * ...@@ -50,6 +51,9 @@ GatherState *
ExecInitGather(Gather *node, EState *estate, int eflags) ExecInitGather(Gather *node, EState *estate, int eflags)
{ {
GatherState *gatherstate; GatherState *gatherstate;
Plan *outerNode;
bool hasoid;
TupleDesc tupDesc;
/* Gather node doesn't have innerPlan node. */ /* Gather node doesn't have innerPlan node. */
Assert(innerPlan(node) == NULL); Assert(innerPlan(node) == NULL);
...@@ -82,13 +86,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags) ...@@ -82,13 +86,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
/* /*
* tuple table initialization * tuple table initialization
*/ */
gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate);
ExecInitResultTupleSlot(estate, &gatherstate->ps); ExecInitResultTupleSlot(estate, &gatherstate->ps);
/* /*
* now initialize outer plan * now initialize outer plan
*/ */
outerPlanState(gatherstate) = ExecInitNode(outerPlan(node), estate, eflags); outerNode = outerPlan(node);
outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
gatherstate->ps.ps_TupFromTlist = false; gatherstate->ps.ps_TupFromTlist = false;
...@@ -98,6 +103,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags) ...@@ -98,6 +103,14 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
ExecAssignResultTypeFromTL(&gatherstate->ps); ExecAssignResultTypeFromTL(&gatherstate->ps);
ExecAssignProjectionInfo(&gatherstate->ps, NULL); ExecAssignProjectionInfo(&gatherstate->ps, NULL);
/*
* Initialize funnel slot to same tuple descriptor as outer plan.
*/
if (!ExecContextForcesOids(&gatherstate->ps, &hasoid))
hasoid = false;
tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid);
ExecSetSlotDescriptor(gatherstate->funnel_slot, tupDesc);
return gatherstate; return gatherstate;
} }
...@@ -113,6 +126,9 @@ ExecGather(GatherState *node) ...@@ -113,6 +126,9 @@ ExecGather(GatherState *node)
{ {
int i; int i;
TupleTableSlot *slot; TupleTableSlot *slot;
TupleTableSlot *resultSlot;
ExprDoneCond isDone;
ExprContext *econtext;
/* /*
* Initialize the parallel context and workers on first execution. We do * Initialize the parallel context and workers on first execution. We do
...@@ -169,7 +185,53 @@ ExecGather(GatherState *node) ...@@ -169,7 +185,53 @@ ExecGather(GatherState *node)
node->initialized = true; node->initialized = true;
} }
slot = gather_getnext(node); /*
* Check to see if we're still projecting out tuples from a previous scan
* tuple (because there is a function-returning-set in the projection
* expressions). If so, try to project another one.
*/
if (node->ps.ps_TupFromTlist)
{
resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
if (isDone == ExprMultipleResult)
return resultSlot;
/* Done with that source tuple... */
node->ps.ps_TupFromTlist = false;
}
/*
* Reset per-tuple memory context to free any expression evaluation
* storage allocated in the previous tuple cycle. Note we can't do this
* until we're done projecting.
*/
econtext = node->ps.ps_ExprContext;
ResetExprContext(econtext);
/* Get and return the next tuple, projecting if necessary. */
for (;;)
{
/*
* Get next tuple, either from one of our workers, or by running the
* plan ourselves.
*/
slot = gather_getnext(node);
if (TupIsNull(slot))
return NULL;
/*
* form the result tuple using ExecProject(), and return it --- unless
* the projection produces an empty set, in which case we must loop
* back around for another tuple
*/
econtext->ecxt_outertuple = slot;
resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone);
if (isDone != ExprEndResult)
{
node->ps.ps_TupFromTlist = (isDone == ExprMultipleResult);
return resultSlot;
}
}
return slot; return slot;
} }
...@@ -201,18 +263,11 @@ ExecEndGather(GatherState *node) ...@@ -201,18 +263,11 @@ ExecEndGather(GatherState *node)
static TupleTableSlot * static TupleTableSlot *
gather_getnext(GatherState *gatherstate) gather_getnext(GatherState *gatherstate)
{ {
PlanState *outerPlan; PlanState *outerPlan = outerPlanState(gatherstate);
TupleTableSlot *outerTupleSlot; TupleTableSlot *outerTupleSlot;
TupleTableSlot *slot; TupleTableSlot *fslot = gatherstate->funnel_slot;
HeapTuple tup; HeapTuple tup;
/*
* We can use projection info of Gather for the tuples received from
* worker backends as currently for all cases worker backends sends the
* projected tuple as required by Gather node.
*/
slot = gatherstate->ps.ps_ProjInfo->pi_slot;
while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally) while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
{ {
if (gatherstate->funnel != NULL) if (gatherstate->funnel != NULL)
...@@ -229,19 +284,17 @@ gather_getnext(GatherState *gatherstate) ...@@ -229,19 +284,17 @@ gather_getnext(GatherState *gatherstate)
if (HeapTupleIsValid(tup)) if (HeapTupleIsValid(tup))
{ {
ExecStoreTuple(tup, /* tuple to store */ ExecStoreTuple(tup, /* tuple to store */
slot, /* slot to store in */ fslot, /* slot in which to store the tuple */
InvalidBuffer, /* buffer associated with this InvalidBuffer, /* buffer associated with this
* tuple */ * tuple */
true); /* pfree this pointer if not from heap */ true); /* pfree this pointer if not from heap */
return slot; return fslot;
} }
} }
if (gatherstate->need_to_scan_locally) if (gatherstate->need_to_scan_locally)
{ {
outerPlan = outerPlanState(gatherstate);
outerTupleSlot = ExecProcNode(outerPlan); outerTupleSlot = ExecProcNode(outerPlan);
if (!TupIsNull(outerTupleSlot)) if (!TupIsNull(outerTupleSlot))
...@@ -251,7 +304,7 @@ gather_getnext(GatherState *gatherstate) ...@@ -251,7 +304,7 @@ gather_getnext(GatherState *gatherstate)
} }
} }
return ExecClearTuple(slot); return ExecClearTuple(fslot);
} }
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
......
...@@ -602,12 +602,15 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) ...@@ -602,12 +602,15 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
set_join_references(root, (Join *) plan, rtoffset); set_join_references(root, (Join *) plan, rtoffset);
break; break;
case T_Gather:
set_upper_references(root, plan, rtoffset);
break;
case T_Hash: case T_Hash:
case T_Material: case T_Material:
case T_Sort: case T_Sort:
case T_Unique: case T_Unique:
case T_SetOp: case T_SetOp:
case T_Gather:
/* /*
* These plan types don't actually bother to evaluate their * These plan types don't actually bother to evaluate their
......
...@@ -1964,6 +1964,7 @@ typedef struct GatherState ...@@ -1964,6 +1964,7 @@ typedef struct GatherState
bool initialized; bool initialized;
struct ParallelExecutorInfo *pei; struct ParallelExecutorInfo *pei;
struct TupleQueueFunnel *funnel; struct TupleQueueFunnel *funnel;
TupleTableSlot *funnel_slot;
bool need_to_scan_locally; bool need_to_scan_locally;
} GatherState; } GatherState;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment