Commit a7de3dc5 authored by Robert Haas's avatar Robert Haas

Support multi-stage aggregation.

Aggregate nodes now have two new modes: a "partial" mode where they
output the unfinalized transition state, and a "finalize" mode where
they accept unfinalized transition states rather than individual
values as input.

These new modes are not used anywhere yet, but they will be necessary
for parallel aggregation.  The infrastructure also figures to be
useful for cases where we want to aggregate local data and remote
data via the FDW interface, and want to bring back partial aggregates
from the remote side that can then be combined with locally generated
partial aggregates to produce the final value.  It may also be useful
even when neither FDWs nor parallelism are in play, as explained in
the comments in nodeAgg.c.

David Rowley and Simon Riggs, reviewed by KaiGai Kohei, Heikki
Linnakangas, Haribabu Kommi, and me.
parent c8642d90
......@@ -27,6 +27,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ <replacea
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
[ , FINALFUNC_EXTRA ]
[ , COMBINEFUNC = <replaceable class="PARAMETER">combinefunc</replaceable> ]
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
[ , MSFUNC = <replaceable class="PARAMETER">msfunc</replaceable> ]
[ , MINVFUNC = <replaceable class="PARAMETER">minvfunc</replaceable> ]
......@@ -45,6 +46,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ [ <replac
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
[ , FINALFUNC_EXTRA ]
[ , COMBINEFUNC = <replaceable class="PARAMETER">combinefunc</replaceable> ]
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
[ , HYPOTHETICAL ]
)
......@@ -58,6 +60,7 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
[ , FINALFUNC_EXTRA ]
[ , COMBINEFUNC = <replaceable class="PARAMETER">combinefunc</replaceable> ]
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
[ , MSFUNC = <replaceable class="PARAMETER">msfunc</replaceable> ]
[ , MINVFUNC = <replaceable class="PARAMETER">minvfunc</replaceable> ]
......@@ -105,12 +108,15 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
functions:
a state transition function
<replaceable class="PARAMETER">sfunc</replaceable>,
and an optional final calculation function
<replaceable class="PARAMETER">ffunc</replaceable>.
an optional final calculation function
<replaceable class="PARAMETER">ffunc</replaceable>,
and an optional combine function
<replaceable class="PARAMETER">combinefunc</replaceable>.
These are used as follows:
<programlisting>
<replaceable class="PARAMETER">sfunc</replaceable>( internal-state, next-data-values ) ---> next-internal-state
<replaceable class="PARAMETER">ffunc</replaceable>( internal-state ) ---> aggregate-value
<replaceable class="PARAMETER">combinefunc</replaceable>( internal-state, internal-state ) ---> next-internal-state
</programlisting>
</para>
......@@ -127,6 +133,12 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
is returned as-is.
</para>
<para>
An aggregate function may also supply a combining function, which allows
the aggregation process to be broken down into multiple steps. This
facilitates query optimization techniques such as parallel query.
</para>
<para>
An aggregate function can provide an initial condition,
that is, an initial value for the internal state value.
......
......@@ -57,6 +57,7 @@ AggregateCreate(const char *aggName,
Oid variadicArgType,
List *aggtransfnName,
List *aggfinalfnName,
List *aggcombinefnName,
List *aggmtransfnName,
List *aggminvtransfnName,
List *aggmfinalfnName,
......@@ -77,6 +78,7 @@ AggregateCreate(const char *aggName,
Form_pg_proc proc;
Oid transfn;
Oid finalfn = InvalidOid; /* can be omitted */
Oid combinefn = InvalidOid; /* can be omitted */
Oid mtransfn = InvalidOid; /* can be omitted */
Oid minvtransfn = InvalidOid; /* can be omitted */
Oid mfinalfn = InvalidOid; /* can be omitted */
......@@ -396,6 +398,30 @@ AggregateCreate(const char *aggName,
}
Assert(OidIsValid(finaltype));
/* handle the combinefn, if supplied */
if (aggcombinefnName)
{
Oid combineType;
/*
* Combine function must have 2 argument, each of which is the
* trans type
*/
fnArgs[0] = aggTransType;
fnArgs[1] = aggTransType;
combinefn = lookup_agg_function(aggcombinefnName, 2, fnArgs,
variadicArgType, &combineType);
/* Ensure the return type matches the aggregates trans type */
if (combineType != aggTransType)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("return type of combine function %s is not %s",
NameListToString(aggcombinefnName),
format_type_be(aggTransType))));
}
/*
* If finaltype (i.e. aggregate return type) is polymorphic, inputs must
* be polymorphic also, else parser will fail to deduce result type.
......@@ -567,6 +593,7 @@ AggregateCreate(const char *aggName,
values[Anum_pg_aggregate_aggnumdirectargs - 1] = Int16GetDatum(numDirectArgs);
values[Anum_pg_aggregate_aggtransfn - 1] = ObjectIdGetDatum(transfn);
values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn);
values[Anum_pg_aggregate_aggcombinefn - 1] = ObjectIdGetDatum(combinefn);
values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn);
values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn);
values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn);
......@@ -618,6 +645,15 @@ AggregateCreate(const char *aggName,
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/* Depends on combine function, if any */
if (OidIsValid(combinefn))
{
referenced.classId = ProcedureRelationId;
referenced.objectId = combinefn;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/* Depends on forward transition function, if any */
if (OidIsValid(mtransfn))
{
......@@ -659,7 +695,7 @@ AggregateCreate(const char *aggName,
/*
* lookup_agg_function
* common code for finding transfn, invtransfn and finalfn
* common code for finding transfn, invtransfn, finalfn, and combinefn
*
* Returns OID of function, and stores its return type into *rettype
*
......
......@@ -61,6 +61,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
char aggKind = AGGKIND_NORMAL;
List *transfuncName = NIL;
List *finalfuncName = NIL;
List *combinefuncName = NIL;
List *mtransfuncName = NIL;
List *minvtransfuncName = NIL;
List *mfinalfuncName = NIL;
......@@ -124,6 +125,8 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
transfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "finalfunc") == 0)
finalfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "combinefunc") == 0)
combinefuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "msfunc") == 0)
mtransfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "minvfunc") == 0)
......@@ -383,6 +386,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
variadicArgType,
transfuncName, /* step function name */
finalfuncName, /* final function name */
combinefuncName, /* combine function name */
mtransfuncName, /* fwd trans function name */
minvtransfuncName, /* inv trans function name */
mfinalfuncName, /* final function name */
......
......@@ -909,7 +909,15 @@ ExplainNode(PlanState *planstate, List *ancestors,
break;
case T_Agg:
sname = "Aggregate";
switch (((Agg *) plan)->aggstrategy)
{
Agg *agg = (Agg *) plan;
if (agg->finalizeAggs == false)
operation = "Partial";
else if (agg->combineStates == true)
operation = "Finalize";
switch (agg->aggstrategy)
{
case AGG_PLAIN:
pname = "Aggregate";
......@@ -928,6 +936,10 @@ ExplainNode(PlanState *planstate, List *ancestors,
strategy = "???";
break;
}
if (operation != NULL)
pname = psprintf("%s %s", operation, pname);
}
break;
case T_WindowAgg:
pname = sname = "WindowAgg";
......
This diff is collapsed.
......@@ -865,6 +865,8 @@ _copyAgg(const Agg *from)
COPY_SCALAR_FIELD(aggstrategy);
COPY_SCALAR_FIELD(numCols);
COPY_SCALAR_FIELD(combineStates);
COPY_SCALAR_FIELD(finalizeAggs);
if (from->numCols > 0)
{
COPY_POINTER_FIELD(grpColIdx, from->numCols * sizeof(AttrNumber));
......
......@@ -695,6 +695,9 @@ _outAgg(StringInfo str, const Agg *node)
for (i = 0; i < node->numCols; i++)
appendStringInfo(str, " %d", node->grpColIdx[i]);
WRITE_BOOL_FIELD(combineStates);
WRITE_BOOL_FIELD(finalizeAggs);
appendStringInfoString(str, " :grpOperators");
for (i = 0; i < node->numCols; i++)
appendStringInfo(str, " %u", node->grpOperators[i]);
......
......@@ -1989,6 +1989,8 @@ _readAgg(void)
READ_ENUM_FIELD(aggstrategy, AggStrategy);
READ_INT_FIELD(numCols);
READ_ATTRNUMBER_ARRAY(grpColIdx, local_node->numCols);
READ_BOOL_FIELD(combineStates);
READ_BOOL_FIELD(finalizeAggs);
READ_OID_ARRAY(grpOperators, local_node->numCols);
READ_LONG_FIELD(numGroups);
READ_NODE_FIELD(groupingSets);
......
......@@ -1054,6 +1054,8 @@ create_unique_plan(PlannerInfo *root, UniquePath *best_path)
groupOperators,
NIL,
numGroups,
false,
true,
subplan);
}
else
......@@ -4557,9 +4559,8 @@ Agg *
make_agg(PlannerInfo *root, List *tlist, List *qual,
AggStrategy aggstrategy, const AggClauseCosts *aggcosts,
int numGroupCols, AttrNumber *grpColIdx, Oid *grpOperators,
List *groupingSets,
long numGroups,
Plan *lefttree)
List *groupingSets, long numGroups, bool combineStates,
bool finalizeAggs, Plan *lefttree)
{
Agg *node = makeNode(Agg);
Plan *plan = &node->plan;
......@@ -4568,6 +4569,8 @@ make_agg(PlannerInfo *root, List *tlist, List *qual,
node->aggstrategy = aggstrategy;
node->numCols = numGroupCols;
node->combineStates = combineStates;
node->finalizeAggs = finalizeAggs;
node->grpColIdx = grpColIdx;
node->grpOperators = grpOperators;
node->numGroups = numGroups;
......
......@@ -2005,6 +2005,8 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
extract_grouping_ops(parse->groupClause),
NIL,
numGroups,
false,
true,
result_plan);
/* Hashed aggregation produces randomly-ordered results */
current_pathkeys = NIL;
......@@ -2312,6 +2314,8 @@ grouping_planner(PlannerInfo *root, double tuple_fraction)
extract_grouping_ops(parse->distinctClause),
NIL,
numDistinctRows,
false,
true,
result_plan);
/* Hashed aggregation produces randomly-ordered results */
current_pathkeys = NIL;
......@@ -2549,6 +2553,8 @@ build_grouping_chain(PlannerInfo *root,
extract_grouping_ops(groupClause),
gsets,
numGroups,
false,
true,
sort_plan);
/*
......@@ -2588,6 +2594,8 @@ build_grouping_chain(PlannerInfo *root,
extract_grouping_ops(groupClause),
gsets,
numGroups,
false,
true,
result_plan);
((Agg *) result_plan)->chain = chain;
......
......@@ -775,6 +775,8 @@ make_union_unique(SetOperationStmt *op, Plan *plan,
extract_grouping_ops(groupList),
NIL,
numGroups,
false,
true,
plan);
/* Hashed aggregation produces randomly-ordered results */
*sortClauses = NIL;
......
......@@ -1927,6 +1927,42 @@ build_aggregate_transfn_expr(Oid *agg_input_types,
}
}
/*
* Like build_aggregate_transfn_expr, but creates an expression tree for the
* combine function of an aggregate, rather than the transition function.
*/
void
build_aggregate_combinefn_expr(Oid agg_state_type,
Oid agg_input_collation,
Oid combinefn_oid,
Expr **combinefnexpr)
{
Param *argp;
List *args;
FuncExpr *fexpr;
/* Build arg list to use in the combinefn FuncExpr node. */
argp = makeNode(Param);
argp->paramkind = PARAM_EXEC;
argp->paramid = -1;
argp->paramtype = agg_state_type;
argp->paramtypmod = -1;
argp->paramcollid = agg_input_collation;
argp->location = -1;
/* transition state type is arg 1 and 2 */
args = list_make2(argp, argp);
fexpr = makeFuncExpr(combinefn_oid,
agg_state_type,
args,
InvalidOid,
agg_input_collation,
COERCE_EXPLICIT_CALL);
fexpr->funcvariadic = false;
*combinefnexpr = (Expr *) fexpr;
}
/*
* Like build_aggregate_transfn_expr, but creates an expression tree for the
* final function of an aggregate, rather than the transition function.
......
......@@ -12383,6 +12383,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
PGresult *res;
int i_aggtransfn;
int i_aggfinalfn;
int i_aggcombinefn;
int i_aggmtransfn;
int i_aggminvtransfn;
int i_aggmfinalfn;
......@@ -12399,6 +12400,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
int i_convertok;
const char *aggtransfn;
const char *aggfinalfn;
const char *aggcombinefn;
const char *aggmtransfn;
const char *aggminvtransfn;
const char *aggmfinalfn;
......@@ -12429,7 +12431,26 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
selectSourceSchema(fout, agginfo->aggfn.dobj.namespace->dobj.name);
/* Get aggregate-specific details */
if (fout->remoteVersion >= 90400)
if (fout->remoteVersion >= 90600)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"aggcombinefn, aggmtransfn, "
"aggminvtransfn, aggmfinalfn, aggmtranstype::pg_catalog.regtype, "
"aggfinalextra, aggmfinalextra, "
"aggsortop::pg_catalog.regoperator, "
"(aggkind = 'h') AS hypothetical, "
"aggtransspace, agginitval, "
"aggmtransspace, aggminitval, "
"true AS convertok, "
"pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, "
"pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs "
"FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
"WHERE a.aggfnoid = p.oid "
"AND p.oid = '%u'::pg_catalog.oid",
agginfo->aggfn.dobj.catId.oid);
}
else if (fout->remoteVersion >= 90400)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
......@@ -12539,6 +12560,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
i_aggtransfn = PQfnumber(res, "aggtransfn");
i_aggfinalfn = PQfnumber(res, "aggfinalfn");
i_aggcombinefn = PQfnumber(res, "aggcombinefn");
i_aggmtransfn = PQfnumber(res, "aggmtransfn");
i_aggminvtransfn = PQfnumber(res, "aggminvtransfn");
i_aggmfinalfn = PQfnumber(res, "aggmfinalfn");
......@@ -12556,6 +12578,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
aggtransfn = PQgetvalue(res, 0, i_aggtransfn);
aggfinalfn = PQgetvalue(res, 0, i_aggfinalfn);
aggcombinefn = PQgetvalue(res, 0, i_aggcombinefn);
aggmtransfn = PQgetvalue(res, 0, i_aggmtransfn);
aggminvtransfn = PQgetvalue(res, 0, i_aggminvtransfn);
aggmfinalfn = PQgetvalue(res, 0, i_aggmfinalfn);
......@@ -12644,6 +12667,11 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
appendPQExpBufferStr(details, ",\n FINALFUNC_EXTRA");
}
if (strcmp(aggcombinefn, "-") != 0)
{
appendPQExpBuffer(details, ",\n COMBINEFUNC = %s", aggcombinefn);
}
if (strcmp(aggmtransfn, "-") != 0)
{
appendPQExpBuffer(details, ",\n MSFUNC = %s,\n MINVFUNC = %s,\n MSTYPE = %s",
......
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201601192
#define CATALOG_VERSION_NO 201601201
#endif
This diff is collapsed.
......@@ -1851,6 +1851,8 @@ typedef struct AggState
AggStatePerTrans curpertrans; /* currently active trans state */
bool input_done; /* indicates end of input */
bool agg_done; /* indicates completion of Agg scan */
bool combineStates; /* input tuples contain transition states */
bool finalizeAggs; /* should we call the finalfn on agg states? */
int projected_set; /* The last projected grouping set */
int current_set; /* The current grouping set being evaluated */
Bitmapset *grouped_cols; /* grouped cols in current projection */
......
......@@ -726,6 +726,8 @@ typedef struct Agg
AggStrategy aggstrategy;
int numCols; /* number of grouping columns */
AttrNumber *grpColIdx; /* their indexes in the target list */
bool combineStates; /* input tuples contain transition states */
bool finalizeAggs; /* should we call the finalfn on agg states? */
Oid *grpOperators; /* equality operators to compare with */
long numGroups; /* estimated number of groups in input */
List *groupingSets; /* grouping sets to use */
......
......@@ -60,9 +60,8 @@ extern Sort *make_sort_from_groupcols(PlannerInfo *root, List *groupcls,
extern Agg *make_agg(PlannerInfo *root, List *tlist, List *qual,
AggStrategy aggstrategy, const AggClauseCosts *aggcosts,
int numGroupCols, AttrNumber *grpColIdx, Oid *grpOperators,
List *groupingSets,
long numGroups,
Plan *lefttree);
List *groupingSets, long numGroups, bool combineStates,
bool finalizeAggs, Plan *lefttree);
extern WindowAgg *make_windowagg(PlannerInfo *root, List *tlist,
List *windowFuncs, Index winref,
int partNumCols, AttrNumber *partColIdx, Oid *partOperators,
......
......@@ -46,6 +46,11 @@ extern void build_aggregate_transfn_expr(Oid *agg_input_types,
Expr **transfnexpr,
Expr **invtransfnexpr);
extern void build_aggregate_combinefn_expr(Oid agg_state_type,
Oid agg_input_collation,
Oid combinefn_oid,
Expr **combinefnexpr);
extern void build_aggregate_finalfn_expr(Oid *agg_input_types,
int num_finalfn_inputs,
Oid agg_state_type,
......
......@@ -101,6 +101,24 @@ CREATE AGGREGATE sumdouble (float8)
msfunc = float8pl,
minvfunc = float8mi
);
-- Test aggregate combine function
-- ensure create aggregate works.
CREATE AGGREGATE mysum (int)
(
stype = int,
sfunc = int4pl,
combinefunc = int4pl
);
-- Ensure all these functions made it into the catalog
SELECT aggfnoid,aggtransfn,aggcombinefn,aggtranstype
FROM pg_aggregate
WHERE aggfnoid = 'mysum'::REGPROC;
aggfnoid | aggtransfn | aggcombinefn | aggtranstype
----------+------------+--------------+--------------
mysum | int4pl | int4pl | 23
(1 row)
DROP AGGREGATE mysum (int);
-- invalid: nonstrict inverse with strict forward function
CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS
$$ SELECT $1 - $2; $$
......
......@@ -115,6 +115,23 @@ CREATE AGGREGATE sumdouble (float8)
minvfunc = float8mi
);
-- Test aggregate combine function
-- ensure create aggregate works.
CREATE AGGREGATE mysum (int)
(
stype = int,
sfunc = int4pl,
combinefunc = int4pl
);
-- Ensure all these functions made it into the catalog
SELECT aggfnoid,aggtransfn,aggcombinefn,aggtranstype
FROM pg_aggregate
WHERE aggfnoid = 'mysum'::REGPROC;
DROP AGGREGATE mysum (int);
-- invalid: nonstrict inverse with strict forward function
CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment