Commit 6cb86143 authored by Tom Lane's avatar Tom Lane

Allow aggregates to provide estimates of their transition state data size.

Formerly the planner had a hard-wired rule of thumb for guessing the amount
of space consumed by an aggregate function's transition state data.  This
estimate is critical to deciding whether it's OK to use hash aggregation,
and in many situations the built-in estimate isn't very good.  This patch
adds a column to pg_aggregate wherein a per-aggregate estimate can be
provided, overriding the planner's default, and infrastructure for setting
the column via CREATE AGGREGATE.

It may be that additional smarts will be required in future, perhaps even
a per-aggregate estimation function.  But this is already a step forward.

This is extracted from a larger patch to improve the performance of numeric
and int8 aggregates.  I (tgl) thought it was worth reviewing and committing
this infrastructure separately.  In this commit, all built-in aggregates
are given aggtransspace = 0, so no behavior should change.

Hadi Moshayedi, reviewed by Pavel Stehule and Tomas Vondra
parent 55c3d86a
......@@ -372,6 +372,13 @@
<entry><literal><link linkend="catalog-pg-type"><structname>pg_type</structname></link>.oid</literal></entry>
<entry>Data type of the aggregate function's internal transition (state) data</entry>
</row>
<row>
<entry><structfield>aggtransspace</structfield></entry>
<entry><type>int4</type></entry>
<entry></entry>
<entry>Approximate average size (in bytes) of the transition state
data, or zero to use a default estimate</entry>
</row>
<row>
<entry><structfield>agginitval</structfield></entry>
<entry><type>text</type></entry>
......
......@@ -24,6 +24,7 @@ PostgreSQL documentation
CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ <replaceable class="parameter">argmode</replaceable> ] [ <replaceable class="parameter">arg_name</replaceable> ] <replaceable class="parameter">arg_data_type</replaceable> [ , ... ] ) (
SFUNC = <replaceable class="PARAMETER">sfunc</replaceable>,
STYPE = <replaceable class="PARAMETER">state_data_type</replaceable>
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
[ , SORTOP = <replaceable class="PARAMETER">sort_operator</replaceable> ]
......@@ -35,6 +36,7 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
BASETYPE = <replaceable class="PARAMETER">base_type</replaceable>,
SFUNC = <replaceable class="PARAMETER">sfunc</replaceable>,
STYPE = <replaceable class="PARAMETER">state_data_type</replaceable>
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
[ , SORTOP = <replaceable class="PARAMETER">sort_operator</replaceable> ]
......@@ -264,6 +266,22 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">state_data_size</replaceable></term>
<listitem>
<para>
The approximate average size (in bytes) of the aggregate's state value.
If this parameter is omitted or is zero, a default estimate is used
based on the <replaceable>state_data_type</>.
The planner uses this value to estimate the memory required for a
grouped aggregate query. The planner will consider using hash
aggregation for such a query only if the hash table is estimated to fit
in <xref linkend="guc-work-mem">; therefore, large values of this
parameter discourage use of hash aggregation.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">ffunc</replaceable></term>
<listitem>
......
......@@ -55,6 +55,7 @@ AggregateCreate(const char *aggName,
List *aggfinalfnName,
List *aggsortopName,
Oid aggTransType,
int32 aggTransSpace,
const char *agginitval)
{
Relation aggdesc;
......@@ -273,6 +274,7 @@ AggregateCreate(const char *aggName,
values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn);
values[Anum_pg_aggregate_aggsortop - 1] = ObjectIdGetDatum(sortop);
values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType);
values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace);
if (agginitval)
values[Anum_pg_aggregate_agginitval - 1] = CStringGetTextDatum(agginitval);
else
......
......@@ -60,6 +60,7 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
List *sortoperatorName = NIL;
TypeName *baseType = NULL;
TypeName *transType = NULL;
int32 transSpace = 0;
char *initval = NULL;
int numArgs;
oidvector *parameterTypes;
......@@ -102,6 +103,8 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
transType = defGetTypeName(defel);
else if (pg_strcasecmp(defel->defname, "stype1") == 0)
transType = defGetTypeName(defel);
else if (pg_strcasecmp(defel->defname, "sspace") == 0)
transSpace = defGetInt32(defel);
else if (pg_strcasecmp(defel->defname, "initcond") == 0)
initval = defGetString(defel);
else if (pg_strcasecmp(defel->defname, "initcond1") == 0)
......@@ -248,5 +251,6 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
finalfuncName, /* final function name */
sortoperatorName, /* sort operator name */
transTypeId, /* transition data type */
transSpace, /* transition space */
initval); /* initial condition */
}
......@@ -164,6 +164,30 @@ defGetBoolean(DefElem *def)
return false; /* keep compiler quiet */
}
/*
* Extract an int32 value from a DefElem.
*/
int32
defGetInt32(DefElem *def)
{
if (def->arg == NULL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s requires an integer value",
def->defname)));
switch (nodeTag(def->arg))
{
case T_Integer:
return (int32) intVal(def->arg);
default:
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s requires an integer value",
def->defname)));
}
return 0; /* keep compiler quiet */
}
/*
* Extract an int64 value from a DefElem.
*/
......
......@@ -461,6 +461,7 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context)
Oid aggtransfn;
Oid aggfinalfn;
Oid aggtranstype;
int32 aggtransspace;
QualCost argcosts;
Oid *inputTypes;
int numArguments;
......@@ -478,6 +479,7 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context)
aggtransfn = aggform->aggtransfn;
aggfinalfn = aggform->aggfinalfn;
aggtranstype = aggform->aggtranstype;
aggtransspace = aggform->aggtransspace;
ReleaseSysCache(aggTuple);
/* count it */
......@@ -541,22 +543,30 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context)
*/
if (!get_typbyval(aggtranstype))
{
int32 aggtranstypmod;
int32 avgwidth;
/* Use average width if aggregate definition gave one */
if (aggtransspace > 0)
avgwidth = aggtransspace;
else
{
/*
* If transition state is of same type as first input, assume it's
* the same typmod (same width) as well. This works for cases
* like MAX/MIN and is probably somewhat reasonable otherwise.
* If transition state is of same type as first input, assume
* it's the same typmod (same width) as well. This works for
* cases like MAX/MIN and is probably somewhat reasonable
* otherwise.
*/
int32 aggtranstypmod;
if (numArguments > 0 && aggtranstype == inputTypes[0])
aggtranstypmod = exprTypmod((Node *) linitial(aggref->args));
else
aggtranstypmod = -1;
avgwidth = get_typavgwidth(aggtranstype, aggtranstypmod);
avgwidth = MAXALIGN(avgwidth);
}
avgwidth = MAXALIGN(avgwidth);
costs->transitionSpace += avgwidth + 2 * sizeof(void *);
}
else if (aggtranstype == INTERNALOID)
......@@ -564,11 +574,15 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context)
/*
* INTERNAL transition type is a special case: although INTERNAL
* is pass-by-value, it's almost certainly being used as a pointer
* to some large data structure. We assume usage of
* to some large data structure. The aggregate definition can
* provide an estimate of the size. If it doesn't, then we assume
* ALLOCSET_DEFAULT_INITSIZE, which is a good guess if the data is
* being kept in a private memory context, as is done by
* array_agg() for instance.
*/
if (aggtransspace > 0)
costs->transitionSpace += aggtransspace;
else
costs->transitionSpace += ALLOCSET_DEFAULT_INITSIZE;
}
......
......@@ -11521,12 +11521,14 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
int i_aggfinalfn;
int i_aggsortop;
int i_aggtranstype;
int i_aggtransspace;
int i_agginitval;
int i_convertok;
const char *aggtransfn;
const char *aggfinalfn;
const char *aggsortop;
const char *aggtranstype;
const char *aggtransspace;
const char *agginitval;
bool convertok;
......@@ -11544,12 +11546,26 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
selectSourceSchema(fout, agginfo->aggfn.dobj.namespace->dobj.name);
/* Get aggregate-specific details */
if (fout->remoteVersion >= 80400)
if (fout->remoteVersion >= 90400)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"aggsortop::pg_catalog.regoperator, "
"aggtransspace, agginitval, "
"'t'::boolean AS convertok, "
"pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, "
"pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs "
"FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
"WHERE a.aggfnoid = p.oid "
"AND p.oid = '%u'::pg_catalog.oid",
agginfo->aggfn.dobj.catId.oid);
}
else if (fout->remoteVersion >= 80400)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"aggsortop::pg_catalog.regoperator, "
"agginitval, "
"0 AS aggtransspace, agginitval, "
"'t'::boolean AS convertok, "
"pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, "
"pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs "
......@@ -11563,7 +11579,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"aggsortop::pg_catalog.regoperator, "
"agginitval, "
"0 AS aggtransspace, agginitval, "
"'t'::boolean AS convertok "
"FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
"WHERE a.aggfnoid = p.oid "
......@@ -11575,7 +11591,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"0 AS aggsortop, "
"agginitval, "
"0 AS aggtransspace, agginitval, "
"'t'::boolean AS convertok "
"FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
"WHERE a.aggfnoid = p.oid "
......@@ -11587,7 +11603,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
appendPQExpBuffer(query, "SELECT aggtransfn, aggfinalfn, "
"format_type(aggtranstype, NULL) AS aggtranstype, "
"0 AS aggsortop, "
"agginitval, "
"0 AS aggtransspace, agginitval, "
"'t'::boolean AS convertok "
"FROM pg_aggregate "
"WHERE oid = '%u'::oid",
......@@ -11599,7 +11615,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
"aggfinalfn, "
"(SELECT typname FROM pg_type WHERE oid = aggtranstype1) AS aggtranstype, "
"0 AS aggsortop, "
"agginitval1 AS agginitval, "
"0 AS aggtransspace, agginitval1 AS agginitval, "
"(aggtransfn2 = 0 and aggtranstype2 = 0 and agginitval2 is null) AS convertok "
"FROM pg_aggregate "
"WHERE oid = '%u'::oid",
......@@ -11612,6 +11628,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
i_aggfinalfn = PQfnumber(res, "aggfinalfn");
i_aggsortop = PQfnumber(res, "aggsortop");
i_aggtranstype = PQfnumber(res, "aggtranstype");
i_aggtransspace = PQfnumber(res, "aggtransspace");
i_agginitval = PQfnumber(res, "agginitval");
i_convertok = PQfnumber(res, "convertok");
......@@ -11619,6 +11636,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
aggfinalfn = PQgetvalue(res, 0, i_aggfinalfn);
aggsortop = PQgetvalue(res, 0, i_aggsortop);
aggtranstype = PQgetvalue(res, 0, i_aggtranstype);
aggtransspace = PQgetvalue(res, 0, i_aggtransspace);
agginitval = PQgetvalue(res, 0, i_agginitval);
convertok = (PQgetvalue(res, 0, i_convertok)[0] == 't');
......@@ -11672,6 +11690,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
fmtId(aggtranstype));
}
if (strcmp(aggtransspace, "0") != 0)
{
appendPQExpBuffer(details, ",\n SSPACE = %s",
aggtransspace);
}
if (!PQgetisnull(res, 0, i_agginitval))
{
appendPQExpBuffer(details, ",\n INITCOND = ");
......
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201311081
#define CATALOG_VERSION_NO 201311161
#endif
This diff is collapsed.
......@@ -133,6 +133,7 @@ extern Datum transformGenericOptions(Oid catalogId,
extern char *defGetString(DefElem *def);
extern double defGetNumeric(DefElem *def);
extern bool defGetBoolean(DefElem *def);
extern int32 defGetInt32(DefElem *def);
extern int64 defGetInt64(DefElem *def);
extern List *defGetQualifiedName(DefElem *def);
extern TypeName *defGetTypeName(DefElem *def);
......
......@@ -56,7 +56,7 @@ create aggregate aggfstr(integer,integer,text) (
initcond = '{}'
);
create aggregate aggfns(integer,integer,text) (
sfunc = aggfns_trans, stype = aggtype[],
sfunc = aggfns_trans, stype = aggtype[], sspace = 10000,
initcond = '{}'
);
-- variadic aggregate
......
......@@ -702,7 +702,7 @@ SELECT * FROM funcdescs
-- Look for illegal values in pg_aggregate fields.
SELECT ctid, aggfnoid::oid
FROM pg_aggregate as p1
WHERE aggfnoid = 0 OR aggtransfn = 0 OR aggtranstype = 0;
WHERE aggfnoid = 0 OR aggtransfn = 0 OR aggtranstype = 0 OR aggtransspace < 0;
ctid | aggfnoid
------+----------
(0 rows)
......
......@@ -68,7 +68,7 @@ create aggregate aggfstr(integer,integer,text) (
);
create aggregate aggfns(integer,integer,text) (
sfunc = aggfns_trans, stype = aggtype[],
sfunc = aggfns_trans, stype = aggtype[], sspace = 10000,
initcond = '{}'
);
......
......@@ -567,7 +567,7 @@ SELECT * FROM funcdescs
SELECT ctid, aggfnoid::oid
FROM pg_aggregate as p1
WHERE aggfnoid = 0 OR aggtransfn = 0 OR aggtranstype = 0;
WHERE aggfnoid = 0 OR aggtransfn = 0 OR aggtranstype = 0 OR aggtransspace < 0;
-- Make sure the matching pg_proc entry is sensible, too.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment