Commit 2103b7ba authored by Tom Lane's avatar Tom Lane

Phase 2 of hashed-aggregation project. nodeAgg.c now knows how to do

hashed aggregation, but there's not yet planner support for it.
parent fc9814d1
...@@ -29,24 +29,23 @@ ...@@ -29,24 +29,23 @@
* of course). A non-strict finalfunc can make its own choice of * of course). A non-strict finalfunc can make its own choice of
* what to return for a NULL ending transvalue. * what to return for a NULL ending transvalue.
* *
* When the transvalue datatype is pass-by-reference, we have to be * We compute aggregate input expressions and run the transition functions
* careful to ensure that the values survive across tuple cycles yet * in a temporary econtext (aggstate->tmpcontext). This is reset at
* are not allowed to accumulate until end of query. We do this by * least once per input tuple, so when the transvalue datatype is
* "ping-ponging" between two memory contexts; successive calls to the * pass-by-reference, we have to be careful to copy it into a longer-lived
* transfunc are executed in alternate contexts, passing the previous * memory context, and free the prior value to avoid memory leakage.
* transvalue that is in the other context. At the beginning of each * We store transvalues in the memory context aggstate->aggcontext,
* tuple cycle we can reset the current output context to avoid memory * which is also used for the hashtable structures in AGG_HASHED mode.
* usage growth. Note: we must use MemoryContextContains() to check * The node's regular econtext (aggstate->csstate.cstate.cs_ExprContext)
* whether the transfunc has perhaps handed us back one of its input * is used to run finalize functions and compute the output tuple;
* values rather than a freshly palloc'd value; if so, we copy the value * this context can be reset once per output tuple.
* to the context we want it in.
* *
* *
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.91 2002/11/06 00:00:43 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/executor/nodeAgg.c,v 1.92 2002/11/06 22:31:23 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -59,6 +58,7 @@ ...@@ -59,6 +58,7 @@
#include "executor/executor.h" #include "executor/executor.h"
#include "executor/nodeAgg.h" #include "executor/nodeAgg.h"
#include "executor/nodeGroup.h" #include "executor/nodeGroup.h"
#include "executor/nodeHash.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "optimizer/clauses.h" #include "optimizer/clauses.h"
#include "parser/parse_coerce.h" #include "parser/parse_coerce.h"
...@@ -140,8 +140,27 @@ typedef struct AggStatePerAggData ...@@ -140,8 +140,27 @@ typedef struct AggStatePerAggData
*/ */
Tuplesortstate *sortstate; /* sort object, if a DISTINCT agg */ Tuplesortstate *sortstate; /* sort object, if a DISTINCT agg */
} AggStatePerAggData;
Datum transValue; /*
* AggStatePerGroupData - per-aggregate-per-group working state
*
* These values are working state that is initialized at the start of
* an input tuple group and updated for each input tuple.
*
* In AGG_PLAIN and AGG_SORTED modes, we have a single array of these
* structs (pointed to by aggstate->pergroup); we re-use the array for
* each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the
* hash table contains an array of these structs for each tuple group.
*
* Logically, the sortstate field belongs in this struct, but we do not
* keep it here for space reasons: we don't support DISTINCT aggregates
* in AGG_HASHED mode, so there's no reason to use up a pointer field
* in every entry of the hashtable.
*/
typedef struct AggStatePerGroupData
{
Datum transValue; /* current transition value */
bool transValueIsNull; bool transValueIsNull;
bool noTransValue; /* true if transValue not set yet */ bool noTransValue; /* true if transValue not set yet */
...@@ -154,97 +173,143 @@ typedef struct AggStatePerAggData ...@@ -154,97 +173,143 @@ typedef struct AggStatePerAggData
* later input value. Only the first non-NULL input will be * later input value. Only the first non-NULL input will be
* auto-substituted. * auto-substituted.
*/ */
} AggStatePerAggData; } AggStatePerGroupData;
static void initialize_aggregate(AggStatePerAgg peraggstate); /*
static void advance_transition_function(AggStatePerAgg peraggstate, * To implement hashed aggregation, we need a hashtable that stores a
Datum newVal, bool isNull); * representative tuple and an array of AggStatePerGroup structs for each
static void advance_aggregates(AggState *aggstate, ExprContext *econtext); * distinct set of GROUP BY column values. We compute the hash key from
* the GROUP BY columns.
*/
typedef struct AggHashEntryData
{
AggHashEntry next; /* next entry in same hash bucket */
uint32 hashkey; /* exact hash key of this entry */
HeapTuple firstTuple; /* copy of first tuple in this group */
/* per-aggregate transition status array - must be last! */
AggStatePerGroupData pergroup[1]; /* VARIABLE LENGTH ARRAY */
} AggHashEntryData; /* VARIABLE LENGTH STRUCT */
typedef struct AggHashTableData
{
int nbuckets; /* number of buckets in hash table */
AggHashEntry buckets[1]; /* VARIABLE LENGTH ARRAY */
} AggHashTableData; /* VARIABLE LENGTH STRUCT */
static void initialize_aggregates(AggState *aggstate,
AggStatePerAgg peragg,
AggStatePerGroup pergroup);
static void advance_transition_function(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate,
Datum newVal, bool isNull);
static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
static void process_sorted_aggregate(AggState *aggstate, static void process_sorted_aggregate(AggState *aggstate,
AggStatePerAgg peraggstate); AggStatePerAgg peraggstate,
static void finalize_aggregate(AggStatePerAgg peraggstate, AggStatePerGroup pergroupstate);
Datum *resultVal, bool *resultIsNull); static void finalize_aggregate(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate,
Datum *resultVal, bool *resultIsNull);
static void build_hash_table(Agg *node);
static AggHashEntry lookup_hash_entry(Agg *node, TupleTableSlot *slot);
static TupleTableSlot *agg_retrieve_direct(Agg *node);
static void agg_fill_hash_table(Agg *node);
static TupleTableSlot *agg_retrieve_hash_table(Agg *node);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype); static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
/* /*
* Initialize one aggregate for a new set of input values. * Initialize all aggregates for a new group of input values.
* *
* When called, CurrentMemoryContext should be the per-query context. * When called, CurrentMemoryContext should be the per-query context.
*/ */
static void static void
initialize_aggregate(AggStatePerAgg peraggstate) initialize_aggregates(AggState *aggstate,
AggStatePerAgg peragg,
AggStatePerGroup pergroup)
{ {
Aggref *aggref = peraggstate->aggref; int aggno;
/* for (aggno = 0; aggno < aggstate->numaggs; aggno++)
* Start a fresh sort operation for each DISTINCT aggregate.
*/
if (aggref->aggdistinct)
{ {
AggStatePerAgg peraggstate = &peragg[aggno];
AggStatePerGroup pergroupstate = &pergroup[aggno];
Aggref *aggref = peraggstate->aggref;
/* /*
* In case of rescan, maybe there could be an uncompleted sort * Start a fresh sort operation for each DISTINCT aggregate.
* operation? Clean it up if so.
*/ */
if (peraggstate->sortstate) if (aggref->aggdistinct)
tuplesort_end(peraggstate->sortstate); {
/*
* In case of rescan, maybe there could be an uncompleted sort
* operation? Clean it up if so.
*/
if (peraggstate->sortstate)
tuplesort_end(peraggstate->sortstate);
peraggstate->sortstate = peraggstate->sortstate =
tuplesort_begin_datum(peraggstate->inputType, tuplesort_begin_datum(peraggstate->inputType,
peraggstate->sortOperator, peraggstate->sortOperator,
false); false);
} }
/* /*
* (Re)set transValue to the initial value. * (Re)set transValue to the initial value.
* *
* Note that when the initial value is pass-by-ref, we just reuse it * Note that when the initial value is pass-by-ref, we must copy it
* without copying for each group. Hence, transition function had * (into the aggcontext) since we will pfree the transValue later.
* better not scribble on its input, or it will fail for GROUP BY! */
*/ if (peraggstate->initValueIsNull)
peraggstate->transValue = peraggstate->initValue; pergroupstate->transValue = peraggstate->initValue;
peraggstate->transValueIsNull = peraggstate->initValueIsNull; else
{
MemoryContext oldContext;
/* oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
* If the initial value for the transition state doesn't exist in the pergroupstate->transValue = datumCopy(peraggstate->initValue,
* pg_aggregate table then we will let the first non-NULL value peraggstate->transtypeByVal,
* returned from the outer procNode become the initial value. (This is peraggstate->transtypeLen);
* useful for aggregates like max() and min().) The noTransValue flag MemoryContextSwitchTo(oldContext);
* signals that we still need to do this. }
*/ pergroupstate->transValueIsNull = peraggstate->initValueIsNull;
peraggstate->noTransValue = peraggstate->initValueIsNull;
/*
* If the initial value for the transition state doesn't exist in the
* pg_aggregate table then we will let the first non-NULL value
* returned from the outer procNode become the initial value. (This is
* useful for aggregates like max() and min().) The noTransValue flag
* signals that we still need to do this.
*/
pergroupstate->noTransValue = peraggstate->initValueIsNull;
}
} }
/* /*
* Given a new input value, advance the transition function of an aggregate. * Given a new input value, advance the transition function of an aggregate.
* *
* When called, CurrentMemoryContext should be the context we want the * It doesn't matter which memory context this is called in.
* transition function result to be delivered into on this cycle.
*/ */
static void static void
advance_transition_function(AggStatePerAgg peraggstate, advance_transition_function(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate,
Datum newVal, bool isNull) Datum newVal, bool isNull)
{ {
FunctionCallInfoData fcinfo; FunctionCallInfoData fcinfo;
MemoryContext oldContext;
if (peraggstate->transfn.fn_strict) if (peraggstate->transfn.fn_strict)
{ {
/*
* For a strict transfn, nothing happens at a NULL input
* tuple; we just keep the prior transValue.
*/
if (isNull) if (isNull)
{
/*
* For a strict transfn, nothing happens at a NULL input
* tuple; we just keep the prior transValue. However, if the
* transtype is pass-by-ref, we have to copy it into the new
* context because the old one is going to get reset.
*/
if (!peraggstate->transValueIsNull)
peraggstate->transValue = datumCopy(peraggstate->transValue,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
return; return;
} if (pergroupstate->noTransValue)
if (peraggstate->noTransValue)
{ {
/* /*
* transValue has not been initialized. This is the first * transValue has not been initialized. This is the first
...@@ -253,18 +318,19 @@ advance_transition_function(AggStatePerAgg peraggstate, ...@@ -253,18 +318,19 @@ advance_transition_function(AggStatePerAgg peraggstate,
* is binary-compatible with its transtype, so straight copy * is binary-compatible with its transtype, so straight copy
* here is OK.) * here is OK.)
* *
* We had better copy the datum if it is pass-by-ref, since the * We must copy the datum into aggcontext if it is pass-by-ref.
* given pointer may be pointing into a scan tuple that will * We do not need to pfree the old transValue, since it's NULL.
* be freed on the next iteration of the scan.
*/ */
peraggstate->transValue = datumCopy(newVal, oldContext = MemoryContextSwitchTo(aggstate->aggcontext);
peraggstate->transtypeByVal, pergroupstate->transValue = datumCopy(newVal,
peraggstate->transtypeLen); peraggstate->transtypeByVal,
peraggstate->transValueIsNull = false; peraggstate->transtypeLen);
peraggstate->noTransValue = false; pergroupstate->transValueIsNull = false;
pergroupstate->noTransValue = false;
MemoryContextSwitchTo(oldContext);
return; return;
} }
if (peraggstate->transValueIsNull) if (pergroupstate->transValueIsNull)
{ {
/* /*
* Don't call a strict function with NULL inputs. Note it is * Don't call a strict function with NULL inputs. Note it is
...@@ -277,6 +343,9 @@ advance_transition_function(AggStatePerAgg peraggstate, ...@@ -277,6 +343,9 @@ advance_transition_function(AggStatePerAgg peraggstate,
} }
} }
/* We run the transition functions in per-input-tuple memory context */
oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
/* /*
* OK to call the transition function * OK to call the transition function
* *
...@@ -291,84 +360,76 @@ advance_transition_function(AggStatePerAgg peraggstate, ...@@ -291,84 +360,76 @@ advance_transition_function(AggStatePerAgg peraggstate,
fcinfo.flinfo = &peraggstate->transfn; fcinfo.flinfo = &peraggstate->transfn;
fcinfo.nargs = 2; fcinfo.nargs = 2;
fcinfo.arg[0] = peraggstate->transValue; fcinfo.arg[0] = pergroupstate->transValue;
fcinfo.argnull[0] = peraggstate->transValueIsNull; fcinfo.argnull[0] = pergroupstate->transValueIsNull;
fcinfo.arg[1] = newVal; fcinfo.arg[1] = newVal;
fcinfo.argnull[1] = isNull; fcinfo.argnull[1] = isNull;
newVal = FunctionCallInvoke(&fcinfo); newVal = FunctionCallInvoke(&fcinfo);
/* /*
* If the transition function was uncooperative, it may have given us * If pass-by-ref datatype, must copy the new value into aggcontext and
* a pass-by-ref result that points at the scan tuple or the * pfree the prior transValue. But if transfn returned a pointer to its
* prior-cycle working memory. Copy it into the active context if it * first input, we don't need to do anything.
* doesn't look right.
*/ */
if (!peraggstate->transtypeByVal && !fcinfo.isnull && if (!peraggstate->transtypeByVal &&
!MemoryContextContains(CurrentMemoryContext, DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
DatumGetPointer(newVal))) {
newVal = datumCopy(newVal, if (!fcinfo.isnull)
peraggstate->transtypeByVal, {
peraggstate->transtypeLen); MemoryContextSwitchTo(aggstate->aggcontext);
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
}
if (!pergroupstate->transValueIsNull)
pfree(DatumGetPointer(pergroupstate->transValue));
}
pergroupstate->transValue = newVal;
pergroupstate->transValueIsNull = fcinfo.isnull;
peraggstate->transValue = newVal; MemoryContextSwitchTo(oldContext);
peraggstate->transValueIsNull = fcinfo.isnull;
} }
/* /*
* Advance all the aggregates for one input tuple. The input tuple * Advance all the aggregates for one input tuple. The input tuple
* has been stored in econtext->ecxt_scantuple, so that it is accessible * has been stored in tmpcontext->ecxt_scantuple, so that it is accessible
* to ExecEvalExpr. * to ExecEvalExpr. pergroup is the array of per-group structs to use
* (this might be in a hashtable entry).
* *
* When called, CurrentMemoryContext should be the per-query context. * When called, CurrentMemoryContext should be the per-query context.
*/ */
static void static void
advance_aggregates(AggState *aggstate, ExprContext *econtext) advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup)
{ {
MemoryContext oldContext; ExprContext *econtext = aggstate->tmpcontext;
int aggno; int aggno;
/*
* Clear and select the current working context for evaluation
* of the input expressions and transition functions at this
* input tuple.
*/
econtext->ecxt_per_tuple_memory = aggstate->agg_cxt[aggstate->which_cxt];
ResetExprContext(econtext);
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
for (aggno = 0; aggno < aggstate->numaggs; aggno++) for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{ {
AggStatePerAgg peraggstate = &aggstate->peragg[aggno]; AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
AggStatePerGroup pergroupstate = &pergroup[aggno];
Aggref *aggref = peraggstate->aggref; Aggref *aggref = peraggstate->aggref;
Datum newVal; Datum newVal;
bool isNull; bool isNull;
newVal = ExecEvalExpr(aggref->target, econtext, &isNull, NULL); newVal = ExecEvalExprSwitchContext(aggref->target, econtext,
&isNull, NULL);
if (aggref->aggdistinct) if (aggref->aggdistinct)
{ {
/* in DISTINCT mode, we may ignore nulls */ /* in DISTINCT mode, we may ignore nulls */
if (isNull) if (isNull)
continue; continue;
/* putdatum has to be called in per-query context */
MemoryContextSwitchTo(oldContext);
tuplesort_putdatum(peraggstate->sortstate, newVal, isNull); tuplesort_putdatum(peraggstate->sortstate, newVal, isNull);
MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
} }
else else
{ {
advance_transition_function(peraggstate, newVal, isNull); advance_transition_function(aggstate, peraggstate, pergroupstate,
newVal, isNull);
} }
} }
/*
* Make the other context current so that these transition
* results are preserved.
*/
aggstate->which_cxt = 1 - aggstate->which_cxt;
MemoryContextSwitchTo(oldContext);
} }
/* /*
...@@ -381,10 +442,12 @@ advance_aggregates(AggState *aggstate, ExprContext *econtext) ...@@ -381,10 +442,12 @@ advance_aggregates(AggState *aggstate, ExprContext *econtext)
*/ */
static void static void
process_sorted_aggregate(AggState *aggstate, process_sorted_aggregate(AggState *aggstate,
AggStatePerAgg peraggstate) AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate)
{ {
Datum oldVal = (Datum) 0; Datum oldVal = (Datum) 0;
bool haveOldVal = false; bool haveOldVal = false;
MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
MemoryContext oldContext; MemoryContext oldContext;
Datum newVal; Datum newVal;
bool isNull; bool isNull;
...@@ -408,12 +471,11 @@ process_sorted_aggregate(AggState *aggstate, ...@@ -408,12 +471,11 @@ process_sorted_aggregate(AggState *aggstate,
continue; continue;
/* /*
* Clear and select the current working context for evaluation of * Clear and select the working context for evaluation of
* the equality function and transition function. * the equality function and transition function.
*/ */
MemoryContextReset(aggstate->agg_cxt[aggstate->which_cxt]); MemoryContextReset(workcontext);
oldContext = oldContext = MemoryContextSwitchTo(workcontext);
MemoryContextSwitchTo(aggstate->agg_cxt[aggstate->which_cxt]);
if (haveOldVal && if (haveOldVal &&
DatumGetBool(FunctionCall2(&peraggstate->equalfn, DatumGetBool(FunctionCall2(&peraggstate->equalfn,
...@@ -422,24 +484,15 @@ process_sorted_aggregate(AggState *aggstate, ...@@ -422,24 +484,15 @@ process_sorted_aggregate(AggState *aggstate,
/* equal to prior, so forget this one */ /* equal to prior, so forget this one */
if (!peraggstate->inputtypeByVal) if (!peraggstate->inputtypeByVal)
pfree(DatumGetPointer(newVal)); pfree(DatumGetPointer(newVal));
/*
* note we do NOT flip contexts in this case, so no need to
* copy prior transValue to other context.
*/
} }
else else
{ {
advance_transition_function(peraggstate, newVal, false); advance_transition_function(aggstate, peraggstate, pergroupstate,
newVal, false);
/*
* Make the other context current so that this transition
* result is preserved.
*/
aggstate->which_cxt = 1 - aggstate->which_cxt;
/* forget the old value, if any */ /* forget the old value, if any */
if (haveOldVal && !peraggstate->inputtypeByVal) if (haveOldVal && !peraggstate->inputtypeByVal)
pfree(DatumGetPointer(oldVal)); pfree(DatumGetPointer(oldVal));
/* and remember the new one for subsequent equality checks */
oldVal = newVal; oldVal = newVal;
haveOldVal = true; haveOldVal = true;
} }
...@@ -457,13 +510,19 @@ process_sorted_aggregate(AggState *aggstate, ...@@ -457,13 +510,19 @@ process_sorted_aggregate(AggState *aggstate,
/* /*
* Compute the final value of one aggregate. * Compute the final value of one aggregate.
* *
* When called, CurrentMemoryContext should be the context where we want * The finalfunction will be run, and the result delivered, in the
* final values delivered (ie, the per-output-tuple expression context). * output-tuple context; caller's CurrentMemoryContext does not matter.
*/ */
static void static void
finalize_aggregate(AggStatePerAgg peraggstate, finalize_aggregate(AggState *aggstate,
AggStatePerAgg peraggstate,
AggStatePerGroup pergroupstate,
Datum *resultVal, bool *resultIsNull) Datum *resultVal, bool *resultIsNull)
{ {
MemoryContext oldContext;
oldContext = MemoryContextSwitchTo(aggstate->csstate.cstate.cs_ExprContext->ecxt_per_tuple_memory);
/* /*
* Apply the agg's finalfn if one is provided, else return transValue. * Apply the agg's finalfn if one is provided, else return transValue.
*/ */
...@@ -474,9 +533,9 @@ finalize_aggregate(AggStatePerAgg peraggstate, ...@@ -474,9 +533,9 @@ finalize_aggregate(AggStatePerAgg peraggstate,
MemSet(&fcinfo, 0, sizeof(fcinfo)); MemSet(&fcinfo, 0, sizeof(fcinfo));
fcinfo.flinfo = &peraggstate->finalfn; fcinfo.flinfo = &peraggstate->finalfn;
fcinfo.nargs = 1; fcinfo.nargs = 1;
fcinfo.arg[0] = peraggstate->transValue; fcinfo.arg[0] = pergroupstate->transValue;
fcinfo.argnull[0] = peraggstate->transValueIsNull; fcinfo.argnull[0] = pergroupstate->transValueIsNull;
if (fcinfo.flinfo->fn_strict && peraggstate->transValueIsNull) if (fcinfo.flinfo->fn_strict && pergroupstate->transValueIsNull)
{ {
/* don't call a strict function with NULL inputs */ /* don't call a strict function with NULL inputs */
*resultVal = (Datum) 0; *resultVal = (Datum) 0;
...@@ -490,8 +549,8 @@ finalize_aggregate(AggStatePerAgg peraggstate, ...@@ -490,8 +549,8 @@ finalize_aggregate(AggStatePerAgg peraggstate,
} }
else else
{ {
*resultVal = peraggstate->transValue; *resultVal = pergroupstate->transValue;
*resultIsNull = peraggstate->transValueIsNull; *resultIsNull = pergroupstate->transValueIsNull;
} }
/* /*
...@@ -503,8 +562,111 @@ finalize_aggregate(AggStatePerAgg peraggstate, ...@@ -503,8 +562,111 @@ finalize_aggregate(AggStatePerAgg peraggstate,
*resultVal = datumCopy(*resultVal, *resultVal = datumCopy(*resultVal,
peraggstate->resulttypeByVal, peraggstate->resulttypeByVal,
peraggstate->resulttypeLen); peraggstate->resulttypeLen);
MemoryContextSwitchTo(oldContext);
} }
/*
* Initialize the hash table to empty.
*
* The hash table always lives in the aggcontext memory context.
*/
static void
build_hash_table(Agg *node)
{
AggState *aggstate = node->aggstate;
AggHashTable hashtable;
Size tabsize;
Assert(node->aggstrategy == AGG_HASHED);
Assert(node->numGroups > 0);
tabsize = sizeof(AggHashTableData) +
(node->numGroups - 1) * sizeof(AggHashEntry);
hashtable = (AggHashTable) MemoryContextAlloc(aggstate->aggcontext,
tabsize);
MemSet(hashtable, 0, tabsize);
hashtable->nbuckets = node->numGroups;
aggstate->hashtable = hashtable;
}
/*
* Find or create a hashtable entry for the tuple group containing the
* given tuple.
*
* When called, CurrentMemoryContext should be the per-query context.
*/
static AggHashEntry
lookup_hash_entry(Agg *node, TupleTableSlot *slot)
{
AggState *aggstate = node->aggstate;
AggHashTable hashtable = aggstate->hashtable;
MemoryContext tmpmem = aggstate->tmpcontext->ecxt_per_tuple_memory;
HeapTuple tuple = slot->val;
TupleDesc tupdesc = slot->ttc_tupleDescriptor;
uint32 hashkey = 0;
int i;
int bucketno;
AggHashEntry entry;
MemoryContext oldContext;
Size entrysize;
/* Need to run the hash function in short-lived context */
oldContext = MemoryContextSwitchTo(tmpmem);
for (i = 0; i < node->numCols; i++)
{
AttrNumber att = node->grpColIdx[i];
Datum attr;
bool isNull;
attr = heap_getattr(tuple, att, tupdesc, &isNull);
if (isNull)
continue; /* treat nulls as having hash key 0 */
hashkey ^= ComputeHashFunc(attr,
(int) tupdesc->attrs[att - 1]->attlen,
tupdesc->attrs[att - 1]->attbyval);
}
bucketno = hashkey % (uint32) hashtable->nbuckets;
for (entry = hashtable->buckets[bucketno];
entry != NULL;
entry = entry->next)
{
/* Quick check using hashkey */
if (entry->hashkey != hashkey)
continue;
if (execTuplesMatch(entry->firstTuple,
tuple,
tupdesc,
node->numCols, node->grpColIdx,
aggstate->eqfunctions,
tmpmem))
{
MemoryContextSwitchTo(oldContext);
return entry;
}
}
/* Not there, so build a new one */
MemoryContextSwitchTo(aggstate->aggcontext);
entrysize = sizeof(AggHashEntryData) +
(aggstate->numaggs - 1) * sizeof(AggStatePerGroupData);
entry = (AggHashEntry) palloc(entrysize);
MemSet(entry, 0, entrysize);
entry->hashkey = hashkey;
entry->firstTuple = heap_copytuple(tuple);
entry->next = hashtable->buckets[bucketno];
hashtable->buckets[bucketno] = entry;
MemoryContextSwitchTo(oldContext);
/* initialize aggregates for new tuple group */
initialize_aggregates(aggstate, aggstate->peragg, entry->pergroup);
return entry;
}
/* /*
* ExecAgg - * ExecAgg -
...@@ -521,16 +683,39 @@ finalize_aggregate(AggStatePerAgg peraggstate, ...@@ -521,16 +683,39 @@ finalize_aggregate(AggStatePerAgg peraggstate,
*/ */
TupleTableSlot * TupleTableSlot *
ExecAgg(Agg *node) ExecAgg(Agg *node)
{
AggState *aggstate = node->aggstate;
if (aggstate->agg_done)
return NULL;
if (node->aggstrategy == AGG_HASHED)
{
if (!aggstate->table_filled)
agg_fill_hash_table(node);
return agg_retrieve_hash_table(node);
}
else
{
return agg_retrieve_direct(node);
}
}
/*
* ExecAgg for non-hashed case
*/
static TupleTableSlot *
agg_retrieve_direct(Agg *node)
{ {
AggState *aggstate; AggState *aggstate;
EState *estate;
Plan *outerPlan; Plan *outerPlan;
ExprContext *econtext; ExprContext *econtext;
ExprContext *tmpcontext;
ProjectionInfo *projInfo; ProjectionInfo *projInfo;
Datum *aggvalues; Datum *aggvalues;
bool *aggnulls; bool *aggnulls;
AggStatePerAgg peragg; AggStatePerAgg peragg;
MemoryContext oldContext; AggStatePerGroup pergroup;
TupleTableSlot *outerslot; TupleTableSlot *outerslot;
TupleTableSlot *firstSlot; TupleTableSlot *firstSlot;
TupleTableSlot *resultSlot; TupleTableSlot *resultSlot;
...@@ -540,13 +725,16 @@ ExecAgg(Agg *node) ...@@ -540,13 +725,16 @@ ExecAgg(Agg *node)
* get state info from node * get state info from node
*/ */
aggstate = node->aggstate; aggstate = node->aggstate;
estate = node->plan.state;
outerPlan = outerPlan(node); outerPlan = outerPlan(node);
/* econtext is the per-output-tuple expression context */
econtext = aggstate->csstate.cstate.cs_ExprContext; econtext = aggstate->csstate.cstate.cs_ExprContext;
aggvalues = econtext->ecxt_aggvalues; aggvalues = econtext->ecxt_aggvalues;
aggnulls = econtext->ecxt_aggnulls; aggnulls = econtext->ecxt_aggnulls;
/* tmpcontext is the per-input-tuple expression context */
tmpcontext = aggstate->tmpcontext;
projInfo = aggstate->csstate.cstate.cs_ProjInfo; projInfo = aggstate->csstate.cstate.cs_ProjInfo;
peragg = aggstate->peragg; peragg = aggstate->peragg;
pergroup = aggstate->pergroup;
firstSlot = aggstate->csstate.css_ScanTupleSlot; firstSlot = aggstate->csstate.css_ScanTupleSlot;
/* /*
...@@ -586,17 +774,12 @@ ExecAgg(Agg *node) ...@@ -586,17 +774,12 @@ ExecAgg(Agg *node)
/* /*
* Clear the per-output-tuple context for each group * Clear the per-output-tuple context for each group
*/ */
MemoryContextReset(aggstate->tup_cxt); ResetExprContext(econtext);
/* /*
* Initialize working state for a new input tuple group * Initialize working state for a new input tuple group
*/ */
for (aggno = 0; aggno < aggstate->numaggs; aggno++) initialize_aggregates(aggstate, peragg, pergroup);
{
AggStatePerAgg peraggstate = &peragg[aggno];
initialize_aggregate(peraggstate);
}
if (aggstate->grp_firstTuple != NULL) if (aggstate->grp_firstTuple != NULL)
{ {
...@@ -612,7 +795,7 @@ ExecAgg(Agg *node) ...@@ -612,7 +795,7 @@ ExecAgg(Agg *node)
aggstate->grp_firstTuple = NULL; /* don't keep two pointers */ aggstate->grp_firstTuple = NULL; /* don't keep two pointers */
/* set up for first advance_aggregates call */ /* set up for first advance_aggregates call */
econtext->ecxt_scantuple = firstSlot; tmpcontext->ecxt_scantuple = firstSlot;
/* /*
* Process each outer-plan tuple, and then fetch the next one, * Process each outer-plan tuple, and then fetch the next one,
...@@ -620,7 +803,10 @@ ExecAgg(Agg *node) ...@@ -620,7 +803,10 @@ ExecAgg(Agg *node)
*/ */
for (;;) for (;;)
{ {
advance_aggregates(aggstate, econtext); advance_aggregates(aggstate, pergroup);
/* Reset per-input-tuple context after each tuple */
ResetExprContext(tmpcontext);
outerslot = ExecProcNode(outerPlan, (Plan *) node); outerslot = ExecProcNode(outerPlan, (Plan *) node);
if (TupIsNull(outerslot)) if (TupIsNull(outerslot))
...@@ -630,7 +816,7 @@ ExecAgg(Agg *node) ...@@ -630,7 +816,7 @@ ExecAgg(Agg *node)
break; break;
} }
/* set up for next advance_aggregates call */ /* set up for next advance_aggregates call */
econtext->ecxt_scantuple = outerslot; tmpcontext->ecxt_scantuple = outerslot;
/* /*
* If we are grouping, check whether we've crossed a group * If we are grouping, check whether we've crossed a group
...@@ -643,7 +829,7 @@ ExecAgg(Agg *node) ...@@ -643,7 +829,7 @@ ExecAgg(Agg *node)
firstSlot->ttc_tupleDescriptor, firstSlot->ttc_tupleDescriptor,
node->numCols, node->grpColIdx, node->numCols, node->grpColIdx,
aggstate->eqfunctions, aggstate->eqfunctions,
aggstate->agg_cxt[aggstate->which_cxt])) tmpcontext->ecxt_per_tuple_memory))
{ {
/* /*
* Save the first input tuple of the next group. * Save the first input tuple of the next group.
...@@ -658,37 +844,17 @@ ExecAgg(Agg *node) ...@@ -658,37 +844,17 @@ ExecAgg(Agg *node)
/* /*
* Done scanning input tuple group. Finalize each aggregate * Done scanning input tuple group. Finalize each aggregate
* calculation, and stash results in the per-output-tuple context. * calculation, and stash results in the per-output-tuple context.
*
* This is a bit tricky when there are both DISTINCT and plain
* aggregates: we must first finalize all the plain aggs and then
* all the DISTINCT ones. This is needed because the last
* transition values for the plain aggs are stored in the
* not-current working context, and we have to evaluate those aggs
* (and stash the results in the output tup_cxt!) before we start
* flipping contexts again in process_sorted_aggregate.
*/ */
oldContext = MemoryContextSwitchTo(aggstate->tup_cxt);
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &peragg[aggno];
if (!peraggstate->aggref->aggdistinct)
finalize_aggregate(peraggstate,
&aggvalues[aggno], &aggnulls[aggno]);
}
MemoryContextSwitchTo(oldContext);
for (aggno = 0; aggno < aggstate->numaggs; aggno++) for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{ {
AggStatePerAgg peraggstate = &peragg[aggno]; AggStatePerAgg peraggstate = &peragg[aggno];
AggStatePerGroup pergroupstate = &pergroup[aggno];
if (peraggstate->aggref->aggdistinct) if (peraggstate->aggref->aggdistinct)
{ process_sorted_aggregate(aggstate, peraggstate, pergroupstate);
process_sorted_aggregate(aggstate, peraggstate);
oldContext = MemoryContextSwitchTo(aggstate->tup_cxt); finalize_aggregate(aggstate, peraggstate, pergroupstate,
finalize_aggregate(peraggstate, &aggvalues[aggno], &aggnulls[aggno]);
&aggvalues[aggno], &aggnulls[aggno]);
MemoryContextSwitchTo(oldContext);
}
} }
/* /*
...@@ -737,9 +903,158 @@ ExecAgg(Agg *node) ...@@ -737,9 +903,158 @@ ExecAgg(Agg *node)
} }
/* /*
* Do projection and qual check in the per-output-tuple context. * Form a projection tuple using the aggregate results and the
* representative input tuple. Store it in the result tuple slot.
* Note we do not support aggregates returning sets ...
*/
econtext->ecxt_scantuple = firstSlot;
resultSlot = ExecProject(projInfo, NULL);
/*
* If the completed tuple does not match the qualifications, it is
* ignored and we loop back to try to process another group.
* Otherwise, return the tuple.
*/
}
while (!ExecQual(node->plan.qual, econtext, false));
return resultSlot;
}
/*
* ExecAgg for hashed case: phase 1, read input and build hash table
*/
static void
agg_fill_hash_table(Agg *node)
{
AggState *aggstate;
Plan *outerPlan;
ExprContext *tmpcontext;
AggHashEntry entry;
TupleTableSlot *outerslot;
/*
* get state info from node
*/
aggstate = node->aggstate;
outerPlan = outerPlan(node);
/* tmpcontext is the per-input-tuple expression context */
tmpcontext = aggstate->tmpcontext;
/*
* Process each outer-plan tuple, and then fetch the next one,
* until we exhaust the outer plan.
*/
for (;;)
{
outerslot = ExecProcNode(outerPlan, (Plan *) node);
if (TupIsNull(outerslot))
break;
/* set up for advance_aggregates call */
tmpcontext->ecxt_scantuple = outerslot;
/* Find or build hashtable entry for this tuple's group */
entry = lookup_hash_entry(node, outerslot);
/* Advance the aggregates */
advance_aggregates(aggstate, entry->pergroup);
/* Reset per-input-tuple context after each tuple */
ResetExprContext(tmpcontext);
}
aggstate->table_filled = true;
/* Initialize to walk the hash table */
aggstate->next_hash_entry = NULL;
aggstate->next_hash_bucket = 0;
}
/*
* ExecAgg for hashed case: phase 2, retrieving groups from hash table
*/
static TupleTableSlot *
agg_retrieve_hash_table(Agg *node)
{
AggState *aggstate;
ExprContext *econtext;
ProjectionInfo *projInfo;
Datum *aggvalues;
bool *aggnulls;
AggStatePerAgg peragg;
AggStatePerGroup pergroup;
AggHashTable hashtable;
AggHashEntry entry;
TupleTableSlot *firstSlot;
TupleTableSlot *resultSlot;
int aggno;
/*
* get state info from node
*/
aggstate = node->aggstate;
/* econtext is the per-output-tuple expression context */
econtext = aggstate->csstate.cstate.cs_ExprContext;
aggvalues = econtext->ecxt_aggvalues;
aggnulls = econtext->ecxt_aggnulls;
projInfo = aggstate->csstate.cstate.cs_ProjInfo;
peragg = aggstate->peragg;
hashtable = aggstate->hashtable;
firstSlot = aggstate->csstate.css_ScanTupleSlot;
/*
* We loop retrieving groups until we find one matching
* node->plan.qual
*/
do
{
if (aggstate->agg_done)
return NULL;
/*
* Find the next entry in the hash table
*/
entry = aggstate->next_hash_entry;
while (entry == NULL)
{
if (aggstate->next_hash_bucket >= hashtable->nbuckets)
{
/* No more entries in hashtable, so done */
aggstate->agg_done = TRUE;
return NULL;
}
entry = hashtable->buckets[aggstate->next_hash_bucket++];
}
aggstate->next_hash_entry = entry->next;
/*
* Clear the per-output-tuple context for each group
*/
ResetExprContext(econtext);
/*
* Store the copied first input tuple in the tuple table slot
* reserved for it, so that it can be used in ExecProject.
*/ */
econtext->ecxt_per_tuple_memory = aggstate->tup_cxt; ExecStoreTuple(entry->firstTuple,
firstSlot,
InvalidBuffer,
false);
pergroup = entry->pergroup;
/*
* Finalize each aggregate calculation, and stash results in the
* per-output-tuple context.
*/
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &peragg[aggno];
AggStatePerGroup pergroupstate = &pergroup[aggno];
Assert(!peraggstate->aggref->aggdistinct);
finalize_aggregate(aggstate, peraggstate, pergroupstate,
&aggvalues[aggno], &aggnulls[aggno]);
}
/* /*
* Form a projection tuple using the aggregate results and the * Form a projection tuple using the aggregate results and the
...@@ -789,8 +1104,11 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) ...@@ -789,8 +1104,11 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
aggstate = makeNode(AggState); aggstate = makeNode(AggState);
node->aggstate = aggstate; node->aggstate = aggstate;
aggstate->eqfunctions = NULL; aggstate->eqfunctions = NULL;
aggstate->grp_firstTuple = NULL; aggstate->peragg = NULL;
aggstate->agg_done = false; aggstate->agg_done = false;
aggstate->pergroup = NULL;
aggstate->grp_firstTuple = NULL;
aggstate->hashtable = NULL;
/* /*
* find aggregates in targetlist and quals * find aggregates in targetlist and quals
...@@ -817,33 +1135,27 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) ...@@ -817,33 +1135,27 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
} }
/* /*
* Create expression context * Create expression contexts. We need two, one for per-input-tuple
* processing and one for per-output-tuple processing. We cheat a little
* by using ExecAssignExprContext() to build both.
*/ */
ExecAssignExprContext(estate, &aggstate->csstate.cstate); ExecAssignExprContext(estate, &aggstate->csstate.cstate);
aggstate->tmpcontext = aggstate->csstate.cstate.cs_ExprContext;
ExecAssignExprContext(estate, &aggstate->csstate.cstate);
/* /*
* We actually need three separate expression memory contexts: one for * We also need a long-lived memory context for holding hashtable
* calculating per-output-tuple values (ie, the finished aggregate * data structures and transition values. NOTE: the details of what
* results), and two that we ping-pong between for per-input-tuple * is stored in aggcontext and what is stored in the regular per-query
* evaluation of input expressions and transition functions. The * memory context are driven by a simple decision: we want to reset the
* context made by ExecAssignExprContext() is used as the output * aggcontext in ExecReScanAgg to recover no-longer-wanted space.
* context.
*/ */
aggstate->tup_cxt = aggstate->aggcontext =
aggstate->csstate.cstate.cs_ExprContext->ecxt_per_tuple_memory;
aggstate->agg_cxt[0] =
AllocSetContextCreate(CurrentMemoryContext, AllocSetContextCreate(CurrentMemoryContext,
"AggExprContext1", "AggContext",
ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
aggstate->agg_cxt[1] =
AllocSetContextCreate(CurrentMemoryContext,
"AggExprContext2",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
aggstate->which_cxt = 0;
#define AGG_NSLOTS 2 #define AGG_NSLOTS 2
...@@ -854,7 +1166,7 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) ...@@ -854,7 +1166,7 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
ExecInitResultTupleSlot(estate, &aggstate->csstate.cstate); ExecInitResultTupleSlot(estate, &aggstate->csstate.cstate);
/* /*
* Set up aggregate-result storage in the expr context, and also * Set up aggregate-result storage in the output expr context, and also
* allocate my private per-agg working storage * allocate my private per-agg working storage
*/ */
econtext = aggstate->csstate.cstate.cs_ExprContext; econtext = aggstate->csstate.cstate.cs_ExprContext;
...@@ -867,6 +1179,20 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) ...@@ -867,6 +1179,20 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
MemSet(peragg, 0, sizeof(AggStatePerAggData) * numaggs); MemSet(peragg, 0, sizeof(AggStatePerAggData) * numaggs);
aggstate->peragg = peragg; aggstate->peragg = peragg;
if (node->aggstrategy == AGG_HASHED)
{
build_hash_table(node);
aggstate->table_filled = false;
}
else
{
AggStatePerGroup pergroup;
pergroup = (AggStatePerGroup) palloc(sizeof(AggStatePerGroupData) * numaggs);
MemSet(pergroup, 0, sizeof(AggStatePerGroupData) * numaggs);
aggstate->pergroup = pergroup;
}
/* /*
* initialize child nodes * initialize child nodes
*/ */
...@@ -984,12 +1310,15 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent) ...@@ -984,12 +1310,15 @@ ExecInitAgg(Agg *node, EState *estate, Plan *parent)
{ {
/* /*
* Note: use the type from the input expression here, not from * Note: use the type from the input expression here, not from
* pg_proc.proargtypes, because the latter might be 0. * pg_proc.proargtypes, because the latter might be a pseudotype.
* (Consider COUNT(*).) * (Consider COUNT(*).)
*/ */
Oid inputType = exprType(aggref->target); Oid inputType = exprType(aggref->target);
Oid eq_function; Oid eq_function;
/* We don't implement DISTINCT aggs in the HASHED case */
Assert(node->aggstrategy != AGG_HASHED);
peraggstate->inputType = inputType; peraggstate->inputType = inputType;
get_typlenbyval(inputType, get_typlenbyval(inputType,
&peraggstate->inputtypeLen, &peraggstate->inputtypeLen,
...@@ -1055,21 +1384,27 @@ ExecEndAgg(Agg *node) ...@@ -1055,21 +1384,27 @@ ExecEndAgg(Agg *node)
{ {
AggState *aggstate = node->aggstate; AggState *aggstate = node->aggstate;
Plan *outerPlan; Plan *outerPlan;
int aggno;
/* Make sure we have closed any open tuplesorts */
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
if (peraggstate->sortstate)
tuplesort_end(peraggstate->sortstate);
}
ExecFreeProjectionInfo(&aggstate->csstate.cstate); ExecFreeProjectionInfo(&aggstate->csstate.cstate);
/* /*
* Make sure ExecFreeExprContext() frees the right expr context... * Free both the expr contexts.
*/ */
aggstate->csstate.cstate.cs_ExprContext->ecxt_per_tuple_memory = ExecFreeExprContext(&aggstate->csstate.cstate);
aggstate->tup_cxt; aggstate->csstate.cstate.cs_ExprContext = aggstate->tmpcontext;
ExecFreeExprContext(&aggstate->csstate.cstate); ExecFreeExprContext(&aggstate->csstate.cstate);
/* MemoryContextDelete(aggstate->aggcontext);
* ... and I free the others.
*/
MemoryContextDelete(aggstate->agg_cxt[0]);
MemoryContextDelete(aggstate->agg_cxt[1]);
outerPlan = outerPlan(node); outerPlan = outerPlan(node);
ExecEndNode(outerPlan, (Plan *) node); ExecEndNode(outerPlan, (Plan *) node);
...@@ -1088,6 +1423,17 @@ ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent) ...@@ -1088,6 +1423,17 @@ ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent)
{ {
AggState *aggstate = node->aggstate; AggState *aggstate = node->aggstate;
ExprContext *econtext = aggstate->csstate.cstate.cs_ExprContext; ExprContext *econtext = aggstate->csstate.cstate.cs_ExprContext;
int aggno;
/* Make sure we have closed any open tuplesorts */
for (aggno = 0; aggno < aggstate->numaggs; aggno++)
{
AggStatePerAgg peraggstate = &aggstate->peragg[aggno];
if (peraggstate->sortstate)
tuplesort_end(peraggstate->sortstate);
peraggstate->sortstate = NULL;
}
aggstate->agg_done = false; aggstate->agg_done = false;
if (aggstate->grp_firstTuple != NULL) if (aggstate->grp_firstTuple != NULL)
...@@ -1098,6 +1444,14 @@ ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent) ...@@ -1098,6 +1444,14 @@ ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent)
MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * aggstate->numaggs); MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * aggstate->numaggs);
MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * aggstate->numaggs); MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * aggstate->numaggs);
MemoryContextReset(aggstate->aggcontext);
if (node->aggstrategy == AGG_HASHED)
{
build_hash_table(node);
aggstate->table_filled = false;
}
/* /*
* if chgParam of subnode is not null then plan will be re-scanned by * if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode. * first ExecProcNode.
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* locate group boundaries. * locate group boundaries.
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeGroup.c,v 1.48 2002/11/06 00:00:43 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/executor/nodeGroup.c,v 1.49 2002/11/06 22:31:23 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -151,9 +151,8 @@ ExecInitGroup(Group *node, EState *estate, Plan *parent) ...@@ -151,9 +151,8 @@ ExecInitGroup(Group *node, EState *estate, Plan *parent)
*/ */
grpstate = makeNode(GroupState); grpstate = makeNode(GroupState);
node->grpstate = grpstate; node->grpstate = grpstate;
grpstate->grp_useFirstTuple = FALSE;
grpstate->grp_done = FALSE;
grpstate->grp_firstTuple = NULL; grpstate->grp_firstTuple = NULL;
grpstate->grp_done = FALSE;
/* /*
* create expression context * create expression context
...@@ -236,7 +235,6 @@ ExecReScanGroup(Group *node, ExprContext *exprCtxt, Plan *parent) ...@@ -236,7 +235,6 @@ ExecReScanGroup(Group *node, ExprContext *exprCtxt, Plan *parent)
{ {
GroupState *grpstate = node->grpstate; GroupState *grpstate = node->grpstate;
grpstate->grp_useFirstTuple = FALSE;
grpstate->grp_done = FALSE; grpstate->grp_done = FALSE;
if (grpstate->grp_firstTuple != NULL) if (grpstate->grp_firstTuple != NULL)
{ {
......
...@@ -7,7 +7,8 @@ ...@@ -7,7 +7,8 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* *
* $Id: nodeHash.c,v 1.66 2002/09/04 20:31:18 momjian Exp $ * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeHash.c,v 1.67 2002/11/06 22:31:23 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -31,8 +32,6 @@ ...@@ -31,8 +32,6 @@
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
static uint32 hashFunc(Datum key, int typLen, bool byVal);
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecHash * ExecHash
* *
...@@ -532,7 +531,7 @@ ExecHashGetBucket(HashJoinTable hashtable, ...@@ -532,7 +531,7 @@ ExecHashGetBucket(HashJoinTable hashtable,
/* /*
* We reset the eval context each time to reclaim any memory leaked in * We reset the eval context each time to reclaim any memory leaked in
* the hashkey expression or hashFunc itself. * the hashkey expression or ComputeHashFunc itself.
*/ */
ResetExprContext(econtext); ResetExprContext(econtext);
...@@ -550,9 +549,9 @@ ExecHashGetBucket(HashJoinTable hashtable, ...@@ -550,9 +549,9 @@ ExecHashGetBucket(HashJoinTable hashtable,
bucketno = 0; bucketno = 0;
else else
{ {
bucketno = hashFunc(keyval, bucketno = ComputeHashFunc(keyval,
(int) hashtable->typLen, (int) hashtable->typLen,
hashtable->typByVal) hashtable->typByVal)
% (uint32) hashtable->totalbuckets; % (uint32) hashtable->totalbuckets;
} }
...@@ -622,16 +621,16 @@ ExecScanHashBucket(HashJoinState *hjstate, ...@@ -622,16 +621,16 @@ ExecScanHashBucket(HashJoinState *hjstate,
} }
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* hashFunc * ComputeHashFunc
* *
* the hash function for hash joins * the hash function for hash joins (also used for hash aggregation)
* *
* XXX this probably ought to be replaced with datatype-specific * XXX this probably ought to be replaced with datatype-specific
* hash functions, such as those already implemented for hash indexes. * hash functions, such as those already implemented for hash indexes.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
static uint32 uint32
hashFunc(Datum key, int typLen, bool byVal) ComputeHashFunc(Datum key, int typLen, bool byVal)
{ {
unsigned char *k; unsigned char *k;
...@@ -681,7 +680,7 @@ hashFunc(Datum key, int typLen, bool byVal) ...@@ -681,7 +680,7 @@ hashFunc(Datum key, int typLen, bool byVal)
} }
else else
{ {
elog(ERROR, "hashFunc: Invalid typLen %d", typLen); elog(ERROR, "ComputeHashFunc: Invalid typLen %d", typLen);
k = NULL; /* keep compiler quiet */ k = NULL; /* keep compiler quiet */
} }
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v 1.215 2002/11/06 00:00:43 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v 1.216 2002/11/06 22:31:23 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -524,6 +524,7 @@ _copyAgg(Agg *from) ...@@ -524,6 +524,7 @@ _copyAgg(Agg *from)
memcpy(newnode->grpColIdx, from->grpColIdx, memcpy(newnode->grpColIdx, from->grpColIdx,
from->numCols * sizeof(AttrNumber)); from->numCols * sizeof(AttrNumber));
} }
newnode->numGroups = from->numGroups;
return newnode; return newnode;
} }
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Header: /cvsroot/pgsql/src/backend/nodes/outfuncs.c,v 1.177 2002/11/06 00:00:44 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/nodes/outfuncs.c,v 1.178 2002/11/06 22:31:24 tgl Exp $
* *
* NOTES * NOTES
* Every (plan) node in POSTGRES has an associated "out" routine which * Every (plan) node in POSTGRES has an associated "out" routine which
...@@ -597,8 +597,8 @@ _outAgg(StringInfo str, Agg *node) ...@@ -597,8 +597,8 @@ _outAgg(StringInfo str, Agg *node)
{ {
appendStringInfo(str, " AGG "); appendStringInfo(str, " AGG ");
_outPlanInfo(str, (Plan *) node); _outPlanInfo(str, (Plan *) node);
appendStringInfo(str, " :aggstrategy %d :numCols %d ", appendStringInfo(str, " :aggstrategy %d :numCols %d :numGroups %ld ",
(int) node->aggstrategy, node->numCols); (int) node->aggstrategy, node->numCols, node->numGroups);
} }
static void static void
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/optimizer/plan/createplan.c,v 1.120 2002/11/06 00:00:44 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/optimizer/plan/createplan.c,v 1.121 2002/11/06 22:31:24 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -1675,6 +1675,7 @@ make_agg(List *tlist, List *qual, AggStrategy aggstrategy, ...@@ -1675,6 +1675,7 @@ make_agg(List *tlist, List *qual, AggStrategy aggstrategy,
plan->plan_rows *= 0.1; plan->plan_rows *= 0.1;
if (plan->plan_rows < 1) if (plan->plan_rows < 1)
plan->plan_rows = 1; plan->plan_rows = 1;
node->numGroups = (long) plan->plan_rows;
} }
plan->state = (EState *) NULL; plan->state = (EState *) NULL;
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/optimizer/plan/planner.c,v 1.126 2002/11/06 00:00:44 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/optimizer/plan/planner.c,v 1.127 2002/11/06 22:31:24 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -931,6 +931,7 @@ grouping_planner(Query *parse, double tuple_fraction) ...@@ -931,6 +931,7 @@ grouping_planner(Query *parse, double tuple_fraction)
AttrNumber *groupColIdx = NULL; AttrNumber *groupColIdx = NULL;
Path *cheapest_path; Path *cheapest_path;
Path *sorted_path; Path *sorted_path;
bool use_hashed_grouping = false;
/* Preprocess targetlist in case we are inside an INSERT/UPDATE. */ /* Preprocess targetlist in case we are inside an INSERT/UPDATE. */
tlist = preprocess_targetlist(tlist, tlist = preprocess_targetlist(tlist,
...@@ -1209,6 +1210,29 @@ grouping_planner(Query *parse, double tuple_fraction) ...@@ -1209,6 +1210,29 @@ grouping_planner(Query *parse, double tuple_fraction)
group_pathkeys = canonicalize_pathkeys(parse, group_pathkeys); group_pathkeys = canonicalize_pathkeys(parse, group_pathkeys);
sort_pathkeys = canonicalize_pathkeys(parse, sort_pathkeys); sort_pathkeys = canonicalize_pathkeys(parse, sort_pathkeys);
/*
* Consider whether we might want to use hashed grouping.
*/
if (parse->groupClause)
{
/*
* Executor doesn't support hashed aggregation with DISTINCT
* aggregates. (Doing so would imply storing *all* the input
* values in the hash table, which seems like a certain loser.)
*/
if (parse->hasAggs &&
(contain_distinct_agg_clause((Node *) tlist) ||
contain_distinct_agg_clause(parse->havingQual)))
use_hashed_grouping = false;
else
{
#if 0 /* much more to do here */
/* TEMPORARY HOTWIRE FOR TESTING */
use_hashed_grouping = true;
#endif
}
}
/* /*
* Select the best path and create a plan to execute it. * Select the best path and create a plan to execute it.
* *
...@@ -1279,22 +1303,30 @@ grouping_planner(Query *parse, double tuple_fraction) ...@@ -1279,22 +1303,30 @@ grouping_planner(Query *parse, double tuple_fraction)
} }
/* /*
* If any aggregate is present, insert the Agg node, plus an explicit * Insert AGG or GROUP node if needed, plus an explicit sort step
* sort if necessary. * if necessary.
* *
* HAVING clause, if any, becomes qual of the Agg node * HAVING clause, if any, becomes qual of the Agg node
*/ */
if (parse->hasAggs) if (use_hashed_grouping)
{ {
/* Hashed aggregate plan --- no sort needed */
result_plan = (Plan *) make_agg(tlist,
(List *) parse->havingQual,
AGG_HASHED,
length(parse->groupClause),
groupColIdx,
result_plan);
/* Hashed aggregation produces randomly-ordered results */
current_pathkeys = NIL;
}
else if (parse->hasAggs)
{
/* Plain aggregate plan --- sort if needed */
AggStrategy aggstrategy; AggStrategy aggstrategy;
if (parse->groupClause) if (parse->groupClause)
{ {
aggstrategy = AGG_SORTED;
/*
* Add an explicit sort if we couldn't make the path come out
* the way the AGG node needs it.
*/
if (!pathkeys_contained_in(group_pathkeys, current_pathkeys)) if (!pathkeys_contained_in(group_pathkeys, current_pathkeys))
{ {
result_plan = make_groupsortplan(parse, result_plan = make_groupsortplan(parse,
...@@ -1303,9 +1335,18 @@ grouping_planner(Query *parse, double tuple_fraction) ...@@ -1303,9 +1335,18 @@ grouping_planner(Query *parse, double tuple_fraction)
result_plan); result_plan);
current_pathkeys = group_pathkeys; current_pathkeys = group_pathkeys;
} }
aggstrategy = AGG_SORTED;
/*
* The AGG node will not change the sort ordering of its
* groups, so current_pathkeys describes the result too.
*/
} }
else else
{
aggstrategy = AGG_PLAIN; aggstrategy = AGG_PLAIN;
/* Result will be only one row anyway; no sort order */
current_pathkeys = NIL;
}
result_plan = (Plan *) make_agg(tlist, result_plan = (Plan *) make_agg(tlist,
(List *) parse->havingQual, (List *) parse->havingQual,
...@@ -1313,10 +1354,6 @@ grouping_planner(Query *parse, double tuple_fraction) ...@@ -1313,10 +1354,6 @@ grouping_planner(Query *parse, double tuple_fraction)
length(parse->groupClause), length(parse->groupClause),
groupColIdx, groupColIdx,
result_plan); result_plan);
/*
* Note: plain or grouped Agg does not affect any existing
* sort order of the tuples
*/
} }
else else
{ {
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/optimizer/util/clauses.c,v 1.109 2002/09/11 14:48:54 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/optimizer/util/clauses.c,v 1.110 2002/11/06 22:31:24 tgl Exp $
* *
* HISTORY * HISTORY
* AUTHOR DATE MAJOR EVENT * AUTHOR DATE MAJOR EVENT
...@@ -46,6 +46,7 @@ typedef struct ...@@ -46,6 +46,7 @@ typedef struct
} check_subplans_for_ungrouped_vars_context; } check_subplans_for_ungrouped_vars_context;
static bool contain_agg_clause_walker(Node *node, void *context); static bool contain_agg_clause_walker(Node *node, void *context);
static bool contain_distinct_agg_clause_walker(Node *node, void *context);
static bool pull_agg_clause_walker(Node *node, List **listptr); static bool pull_agg_clause_walker(Node *node, List **listptr);
static bool expression_returns_set_walker(Node *node, void *context); static bool expression_returns_set_walker(Node *node, void *context);
static bool contain_subplans_walker(Node *node, void *context); static bool contain_subplans_walker(Node *node, void *context);
...@@ -410,6 +411,32 @@ contain_agg_clause_walker(Node *node, void *context) ...@@ -410,6 +411,32 @@ contain_agg_clause_walker(Node *node, void *context)
return expression_tree_walker(node, contain_agg_clause_walker, context); return expression_tree_walker(node, contain_agg_clause_walker, context);
} }
/*
* contain_distinct_agg_clause
* Recursively search for DISTINCT Aggref nodes within a clause.
*
* Returns true if any DISTINCT aggregate found.
*/
bool
contain_distinct_agg_clause(Node *clause)
{
return contain_distinct_agg_clause_walker(clause, NULL);
}
static bool
contain_distinct_agg_clause_walker(Node *node, void *context)
{
if (node == NULL)
return false;
if (IsA(node, Aggref))
{
if (((Aggref *) node)->aggdistinct)
return true; /* abort the tree traversal and return
* true */
}
return expression_tree_walker(node, contain_distinct_agg_clause_walker, context);
}
/* /*
* pull_agg_clause * pull_agg_clause
* Recursively pulls all Aggref nodes from an expression tree. * Recursively pulls all Aggref nodes from an expression tree.
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: nodeHash.h,v 1.24 2002/06/20 20:29:49 momjian Exp $ * $Id: nodeHash.h,v 1.25 2002/11/06 22:31:24 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -36,5 +36,6 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, ...@@ -36,5 +36,6 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth,
int *virtualbuckets, int *virtualbuckets,
int *physicalbuckets, int *physicalbuckets,
int *numbatches); int *numbatches);
extern uint32 ComputeHashFunc(Datum key, int typLen, bool byVal);
#endif /* NODEHASH_H */ #endif /* NODEHASH_H */
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: execnodes.h,v 1.76 2002/11/06 00:00:44 tgl Exp $ * $Id: execnodes.h,v 1.77 2002/11/06 22:31:24 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -661,12 +661,18 @@ typedef struct MaterialState ...@@ -661,12 +661,18 @@ typedef struct MaterialState
* *
* csstate.css_ScanTupleSlot refers to output of underlying plan. * csstate.css_ScanTupleSlot refers to output of underlying plan.
* *
* Note: the associated ExprContext contains ecxt_aggvalues and ecxt_aggnulls * Note: csstate.cstate.cs_ExprContext contains ecxt_aggvalues and
* arrays, which hold the computed agg values for the current input group * ecxt_aggnulls arrays, which hold the computed agg values for the current
* during evaluation of an Agg node's output tuple(s). * input group during evaluation of an Agg node's output tuple(s). We
* create a second ExprContext, tmpcontext, in which to evaluate input
* expressions and run the aggregate transition functions.
* ------------------------- * -------------------------
*/ */
typedef struct AggStatePerAggData *AggStatePerAgg; /* private in nodeAgg.c */ /* these structs are private in nodeAgg.c: */
typedef struct AggStatePerAggData *AggStatePerAgg;
typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggHashEntryData *AggHashEntry;
typedef struct AggHashTableData *AggHashTable;
typedef struct AggState typedef struct AggState
{ {
...@@ -674,13 +680,18 @@ typedef struct AggState ...@@ -674,13 +680,18 @@ typedef struct AggState
List *aggs; /* all Aggref nodes in targetlist & quals */ List *aggs; /* all Aggref nodes in targetlist & quals */
int numaggs; /* length of list (could be zero!) */ int numaggs; /* length of list (could be zero!) */
FmgrInfo *eqfunctions; /* per-grouping-field equality fns */ FmgrInfo *eqfunctions; /* per-grouping-field equality fns */
HeapTuple grp_firstTuple; /* copy of first tuple of current group */ AggStatePerAgg peragg; /* per-Aggref information */
AggStatePerAgg peragg; /* per-Aggref working state */ MemoryContext aggcontext; /* memory context for long-lived data */
MemoryContext tup_cxt; /* context for per-output-tuple ExprContext *tmpcontext; /* econtext for input expressions */
* expressions */
MemoryContext agg_cxt[2]; /* pair of expression eval memory contexts */
int which_cxt; /* 0 or 1, indicates current agg_cxt */
bool agg_done; /* indicates completion of Agg scan */ bool agg_done; /* indicates completion of Agg scan */
/* these fields are used in AGG_PLAIN and AGG_SORTED modes: */
AggStatePerGroup pergroup; /* per-Aggref-per-group working state */
HeapTuple grp_firstTuple; /* copy of first tuple of current group */
/* these fields are used in AGG_HASHED mode: */
AggHashTable hashtable; /* hash table with one entry per group */
bool table_filled; /* hash table filled yet? */
AggHashEntry next_hash_entry; /* next entry in current chain */
int next_hash_bucket; /* next chain */
} AggState; } AggState;
/* --------------------- /* ---------------------
...@@ -691,9 +702,8 @@ typedef struct GroupState ...@@ -691,9 +702,8 @@ typedef struct GroupState
{ {
CommonScanState csstate; /* its first field is NodeTag */ CommonScanState csstate; /* its first field is NodeTag */
FmgrInfo *eqfunctions; /* per-field lookup data for equality fns */ FmgrInfo *eqfunctions; /* per-field lookup data for equality fns */
bool grp_useFirstTuple; /* first tuple not processed yet */
bool grp_done;
HeapTuple grp_firstTuple; /* copy of first tuple of current group */ HeapTuple grp_firstTuple; /* copy of first tuple of current group */
bool grp_done; /* indicates completion of Group scan */
} GroupState; } GroupState;
/* ---------------- /* ----------------
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: plannodes.h,v 1.59 2002/11/06 00:00:44 tgl Exp $ * $Id: plannodes.h,v 1.60 2002/11/06 22:31:24 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -349,6 +349,7 @@ typedef struct Agg ...@@ -349,6 +349,7 @@ typedef struct Agg
AggStrategy aggstrategy; AggStrategy aggstrategy;
int numCols; /* number of grouping columns */ int numCols; /* number of grouping columns */
AttrNumber *grpColIdx; /* their indexes in the target list */ AttrNumber *grpColIdx; /* their indexes in the target list */
long numGroups; /* estimated number of groups in input */
AggState *aggstate; AggState *aggstate;
} Agg; } Agg;
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: clauses.h,v 1.54 2002/09/11 14:48:55 tgl Exp $ * $Id: clauses.h,v 1.55 2002/11/06 22:31:24 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -40,6 +40,7 @@ extern Expr *make_ands_explicit(List *andclauses); ...@@ -40,6 +40,7 @@ extern Expr *make_ands_explicit(List *andclauses);
extern List *make_ands_implicit(Expr *clause); extern List *make_ands_implicit(Expr *clause);
extern bool contain_agg_clause(Node *clause); extern bool contain_agg_clause(Node *clause);
extern bool contain_distinct_agg_clause(Node *clause);
extern List *pull_agg_clause(Node *clause); extern List *pull_agg_clause(Node *clause);
extern bool expression_returns_set(Node *clause); extern bool expression_returns_set(Node *clause);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment