Commit 4de2d4fb authored by Tom Lane's avatar Tom Lane

Explicitly track whether aggregate final functions modify transition state.

Up to now, there's been hard-wired assumptions that normal aggregates'
final functions never modify their transition states, while ordered-set
aggregates' final functions always do.  This has always been a bit
limiting, and in particular it's getting in the way of improving the
built-in ordered-set aggregates to allow merging of transition states.
Therefore, let's introduce catalog and CREATE AGGREGATE infrastructure
that lets the finalfn's behavior be declared explicitly.

There are now three possibilities for the finalfn behavior: it's purely
read-only, it trashes the transition state irrecoverably, or it changes
the state in such a way that no more transfn calls are possible but the
state can still be passed to other, compatible finalfns.  There are no
examples of this third case today, but we'll shortly make the built-in
OSAs act like that.

This change allows user-defined aggregates to explicitly disclaim support
for use as window functions, and/or to prevent transition state merging,
if their implementations cannot handle that.  While it was previously
possible to handle the window case with a run-time error check, there was
not any way to prevent transition state merging, which in retrospect is
something commit 804163bc should have provided for.  But better late
than never.

In passing, split out pg_aggregate.c's extern function declarations into
a new header file pg_aggregate_fn.h, similarly to what we've done for
some other catalog headers, so that pg_aggregate.h itself can be safe
for frontend files to include.  This lets pg_dump use the symbolic
names for relevant constants.

Discussion: https://postgr.es/m/4834.1507849699@sss.pgh.pa.us
parent 5f340cb3
......@@ -486,6 +486,26 @@
<entry></entry>
<entry>True to pass extra dummy arguments to <structfield>aggmfinalfn</structfield></entry>
</row>
<row>
<entry><structfield>aggfinalmodify</structfield></entry>
<entry><type>char</type></entry>
<entry></entry>
<entry>Whether <structfield>aggfinalfn</structfield> modifies the
transition state value:
<literal>r</literal> if it is read-only,
<literal>s</literal> if the <structfield>aggtransfn</structfield>
cannot be applied after the <structfield>aggfinalfn</structfield>, or
<literal>w</literal> if it writes on the value
</entry>
</row>
<row>
<entry><structfield>aggmfinalmodify</structfield></entry>
<entry><type>char</type></entry>
<entry></entry>
<entry>Like <structfield>aggfinalmodify</structfield>, but for
the <structfield>aggmfinalfn</structfield>
</entry>
</row>
<row>
<entry><structfield>aggsortop</structfield></entry>
<entry><type>oid</type></entry>
......
......@@ -27,6 +27,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ <replacea
[ , SSPACE = <replaceable class="parameter">state_data_size</replaceable> ]
[ , FINALFUNC = <replaceable class="parameter">ffunc</replaceable> ]
[ , FINALFUNC_EXTRA ]
[ , FINALFUNC_MODIFY = { READ_ONLY | SHARABLE | READ_WRITE } ]
[ , COMBINEFUNC = <replaceable class="parameter">combinefunc</replaceable> ]
[ , SERIALFUNC = <replaceable class="parameter">serialfunc</replaceable> ]
[ , DESERIALFUNC = <replaceable class="parameter">deserialfunc</replaceable> ]
......@@ -37,6 +38,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ <replacea
[ , MSSPACE = <replaceable class="parameter">mstate_data_size</replaceable> ]
[ , MFINALFUNC = <replaceable class="parameter">mffunc</replaceable> ]
[ , MFINALFUNC_EXTRA ]
[ , MFINALFUNC_MODIFY = { READ_ONLY | SHARABLE | READ_WRITE } ]
[ , MINITCOND = <replaceable class="parameter">minitial_condition</replaceable> ]
[ , SORTOP = <replaceable class="parameter">sort_operator</replaceable> ]
[ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ]
......@@ -49,6 +51,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ [ <replac
[ , SSPACE = <replaceable class="parameter">state_data_size</replaceable> ]
[ , FINALFUNC = <replaceable class="parameter">ffunc</replaceable> ]
[ , FINALFUNC_EXTRA ]
[ , FINALFUNC_MODIFY = { READ_ONLY | SHARABLE | READ_WRITE } ]
[ , INITCOND = <replaceable class="parameter">initial_condition</replaceable> ]
[ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ]
[ , HYPOTHETICAL ]
......@@ -63,6 +66,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> (
[ , SSPACE = <replaceable class="parameter">state_data_size</replaceable> ]
[ , FINALFUNC = <replaceable class="parameter">ffunc</replaceable> ]
[ , FINALFUNC_EXTRA ]
[ , FINALFUNC_MODIFY = { READ_ONLY | SHARABLE | READ_WRITE } ]
[ , COMBINEFUNC = <replaceable class="parameter">combinefunc</replaceable> ]
[ , SERIALFUNC = <replaceable class="parameter">serialfunc</replaceable> ]
[ , DESERIALFUNC = <replaceable class="parameter">deserialfunc</replaceable> ]
......@@ -73,6 +77,7 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> (
[ , MSSPACE = <replaceable class="parameter">mstate_data_size</replaceable> ]
[ , MFINALFUNC = <replaceable class="parameter">mffunc</replaceable> ]
[ , MFINALFUNC_EXTRA ]
[ , MFINALFUNC_MODIFY = { READ_ONLY | SHARABLE | READ_WRITE } ]
[ , MINITCOND = <replaceable class="parameter">minitial_condition</replaceable> ]
[ , SORTOP = <replaceable class="parameter">sort_operator</replaceable> ]
)
......@@ -197,7 +202,8 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> (
as described in <xref linkend="xaggr-moving-aggregates">. This requires
specifying the <literal>MSFUNC</>, <literal>MINVFUNC</>,
and <literal>MSTYPE</> parameters, and optionally
the <literal>MSPACE</>, <literal>MFINALFUNC</>, <literal>MFINALFUNC_EXTRA</>,
the <literal>MSPACE</>, <literal>MFINALFUNC</>,
<literal>MFINALFUNC_EXTRA</>, <literal>MFINALFUNC_MODIFY</>,
and <literal>MINITCOND</> parameters. Except for <literal>MINVFUNC</>,
these parameters work like the corresponding simple-aggregate parameters
without <literal>M</>; they define a separate implementation of the
......@@ -412,6 +418,21 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
</listitem>
</varlistentry>
<varlistentry>
<term><literal>FINALFUNC_MODIFY</> = { <literal>READ_ONLY</> | <literal>SHARABLE</> | <literal>READ_WRITE</> }</term>
<listitem>
<para>
This option specifies whether the final function is a pure function
that does not modify its arguments. <literal>READ_ONLY</> indicates
it does not; the other two values indicate that it may change the
transition state value. See <xref linkend="sql-createaggregate-notes"
endterm="sql-createaggregate-notes-title"> below for more detail. The
default is <literal>READ_ONLY</>, except for ordered-set aggregates,
for which the default is <literal>READ_WRITE</>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">combinefunc</replaceable></term>
<listitem>
......@@ -563,6 +584,16 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
</listitem>
</varlistentry>
<varlistentry>
<term><literal>MFINALFUNC_MODIFY</> = { <literal>READ_ONLY</> | <literal>SHARABLE</> | <literal>READ_WRITE</> }</term>
<listitem>
<para>
This option is like <literal>FINALFUNC_MODIFY</>, but it describes
the behavior of the moving-aggregate final function.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">minitial_condition</replaceable></term>
<listitem>
......@@ -587,12 +618,12 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
</varlistentry>
<varlistentry>
<term><literal>PARALLEL</literal></term>
<term><literal>PARALLEL =</> { <literal>SAFE</> | <literal>RESTRICTED</> | <literal>UNSAFE</> }</term>
<listitem>
<para>
The meanings of <literal>PARALLEL SAFE</>, <literal>PARALLEL
RESTRICTED</>, and <literal>PARALLEL UNSAFE</> are the same as
for <xref linkend="sql-createfunction">. An aggregate will not be
in <xref linkend="sql-createfunction">. An aggregate will not be
considered for parallelization if it is marked <literal>PARALLEL
UNSAFE</> (which is the default!) or <literal>PARALLEL RESTRICTED</>.
Note that the parallel-safety markings of the aggregate's support
......@@ -624,8 +655,8 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
</para>
</refsect1>
<refsect1>
<title>Notes</title>
<refsect1 id="sql-createaggregate-notes">
<title id="sql-createaggregate-notes-title">Notes</title>
<para>
In parameters that specify support function names, you can write
......@@ -634,6 +665,34 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
of the support functions are determined from other parameters.
</para>
<para>
Ordinarily, Postgres functions are expected to be true functions that
do not modify their input values. However, an aggregate transition
function, <emphasis>when used in the context of an aggregate</>,
is allowed to cheat and modify its transition-state argument in place.
This can provide substantial performance benefits compared to making
a fresh copy of the transition state each time.
</para>
<para>
Likewise, while an aggregate final function is normally expected not to
modify its input values, sometimes it is impractical to avoid modifying
the transition-state argument. Such behavior must be declared using
the <literal>FINALFUNC_MODIFY</> parameter. The <literal>READ_WRITE</>
value indicates that the final function modifies the transition state in
unspecified ways. This value prevents use of the aggregate as a window
function, and it also prevents merging of transition states for aggregate
calls that share the same input values and transition functions.
The <literal>SHARABLE</> value indicates that the transition function
cannot be applied after the final function, but multiple final-function
calls can be performed on the ending transition state value. This value
prevents use of the aggregate as a window function, but it allows merging
of transition states. (That is, the optimization of interest here is not
applying the same final function repeatedly, but applying different final
functions to the same ending transition state value. This is allowed as
long as none of the final functions are marked <literal>READ_WRITE</>.)
</para>
<para>
If an aggregate supports moving-aggregate mode, it will improve
calculation efficiency when the aggregate is used as a window function
......@@ -671,7 +730,8 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
Note that whether or not the aggregate supports moving-aggregate
mode, <productname>PostgreSQL</productname> can handle a moving frame
end without recalculation; this is done by continuing to add new values
to the aggregate's state. It is assumed that the final function does
to the aggregate's state. This is why use of an aggregate as a window
function requires that the final function be read-only: it must
not damage the aggregate's state value, so that the aggregation can be
continued even after an aggregate result value has been obtained for
one set of frame boundaries.
......
......@@ -487,6 +487,13 @@ SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY income) FROM households;
C, since their state values aren't definable as any SQL data type.
(In the above example, notice that the state value is declared as
type <type>internal</> &mdash; this is typical.)
Also, because the final function performs the sort, it is not possible
to continue adding input rows by executing the transition function again
later. This means the final function is not <literal>READ_ONLY</>;
it must be declared in <xref linkend="sql-createaggregate">
as <literal>READ_WRITE</>, or as <literal>SHARABLE</> if it's
possible for additional final-function calls to make use of the
already-sorted state.
</para>
<para>
......@@ -622,16 +629,15 @@ SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY income) FROM households;
<programlisting>
if (AggCheckCallContext(fcinfo, NULL))
</programlisting>
One reason for checking this is that when it is true for a transition
function, the first input
One reason for checking this is that when it is true, the first input
must be a temporary state value and can therefore safely be modified
in-place rather than allocating a new copy.
See <function>int8inc()</> for an example.
(This is the <emphasis>only</>
case where it is safe for a function to modify a pass-by-reference input.
In particular, final functions for normal aggregates must not
modify their inputs in any case, because in some cases they will be
re-executed on the same final state value.)
(While aggregate transition functions are always allowed to modify
the transition value in-place, aggregate final functions are generally
discouraged from doing so; if they do so, the behavior must be declared
when creating the aggregate. See <xref linkend="sql-createaggregate">
for more detail.)
</para>
<para>
......
......@@ -19,6 +19,7 @@
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_aggregate_fn.h"
#include "catalog/pg_language.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
......@@ -65,6 +66,8 @@ AggregateCreate(const char *aggName,
List *aggmfinalfnName,
bool finalfnExtraArgs,
bool mfinalfnExtraArgs,
char finalfnModify,
char mfinalfnModify,
List *aggsortopName,
Oid aggTransType,
int32 aggTransSpace,
......@@ -656,6 +659,8 @@ AggregateCreate(const char *aggName,
values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn);
values[Anum_pg_aggregate_aggfinalextra - 1] = BoolGetDatum(finalfnExtraArgs);
values[Anum_pg_aggregate_aggmfinalextra - 1] = BoolGetDatum(mfinalfnExtraArgs);
values[Anum_pg_aggregate_aggfinalmodify - 1] = CharGetDatum(finalfnModify);
values[Anum_pg_aggregate_aggmfinalmodify - 1] = CharGetDatum(mfinalfnModify);
values[Anum_pg_aggregate_aggsortop - 1] = ObjectIdGetDatum(sortop);
values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType);
values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace);
......
......@@ -26,6 +26,7 @@
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_aggregate_fn.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "commands/alter.h"
......@@ -39,6 +40,9 @@
#include "utils/syscache.h"
static char extractModify(DefElem *defel);
/*
* DefineAggregate
*
......@@ -67,6 +71,8 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List
List *mfinalfuncName = NIL;
bool finalfuncExtraArgs = false;
bool mfinalfuncExtraArgs = false;
char finalfuncModify = 0;
char mfinalfuncModify = 0;
List *sortoperatorName = NIL;
TypeName *baseType = NULL;
TypeName *transType = NULL;
......@@ -143,6 +149,10 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List
finalfuncExtraArgs = defGetBoolean(defel);
else if (pg_strcasecmp(defel->defname, "mfinalfunc_extra") == 0)
mfinalfuncExtraArgs = defGetBoolean(defel);
else if (pg_strcasecmp(defel->defname, "finalfunc_modify") == 0)
finalfuncModify = extractModify(defel);
else if (pg_strcasecmp(defel->defname, "mfinalfunc_modify") == 0)
mfinalfuncModify = extractModify(defel);
else if (pg_strcasecmp(defel->defname, "sortop") == 0)
sortoperatorName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "basetype") == 0)
......@@ -235,6 +245,15 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List
errmsg("aggregate minitcond must not be specified without mstype")));
}
/*
* Default values for modify flags can only be determined once we know the
* aggKind.
*/
if (finalfuncModify == 0)
finalfuncModify = (aggKind == AGGKIND_NORMAL) ? AGGMODIFY_READ_ONLY : AGGMODIFY_READ_WRITE;
if (mfinalfuncModify == 0)
mfinalfuncModify = (aggKind == AGGKIND_NORMAL) ? AGGMODIFY_READ_ONLY : AGGMODIFY_READ_WRITE;
/*
* look up the aggregate's input datatype(s).
*/
......@@ -437,6 +456,8 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List
mfinalfuncName, /* final function name */
finalfuncExtraArgs,
mfinalfuncExtraArgs,
finalfuncModify,
mfinalfuncModify,
sortoperatorName, /* sort operator name */
transTypeId, /* transition data type */
transSpace, /* transition space */
......@@ -446,3 +467,24 @@ DefineAggregate(ParseState *pstate, List *name, List *args, bool oldstyle, List
minitval, /* initial condition */
proparallel); /* parallel safe? */
}
/*
* Convert the string form of [m]finalfunc_modify to the catalog representation
*/
static char
extractModify(DefElem *defel)
{
char *val = defGetString(defel);
if (strcmp(val, "read_only") == 0)
return AGGMODIFY_READ_ONLY;
if (strcmp(val, "sharable") == 0)
return AGGMODIFY_SHARABLE;
if (strcmp(val, "read_write") == 0)
return AGGMODIFY_READ_WRITE;
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("parameter \"%s\" must be READ_ONLY, SHARABLE, or READ_WRITE",
defel->defname)));
return 0; /* keep compiler quiet */
}
......@@ -248,9 +248,9 @@ typedef struct AggStatePerTransData
/*
* Link to an Aggref expr this state value is for.
*
* There can be multiple Aggref's sharing the same state value, as long as
* the inputs and transition function are identical. This points to the
* first one of them.
* There can be multiple Aggref's sharing the same state value, so long as
* the inputs and transition functions are identical and the final
* functions are not read-write. This points to the first one of them.
*/
Aggref *aggref;
......@@ -419,8 +419,8 @@ typedef struct AggStatePerAggData
Oid finalfn_oid;
/*
* fmgr lookup data for final function --- only valid when finalfn_oid oid
* is not InvalidOid.
* fmgr lookup data for final function --- only valid when finalfn_oid is
* not InvalidOid.
*/
FmgrInfo finalfn;
......@@ -439,6 +439,11 @@ typedef struct AggStatePerAggData
int16 resulttypeLen;
bool resulttypeByVal;
/*
* "sharable" is false if this agg cannot share state values with other
* aggregates because the final function is read-write.
*/
bool sharable;
} AggStatePerAggData;
/*
......@@ -572,6 +577,7 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
int lastaggno, List **same_input_transnos);
static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
bool sharable,
Oid aggtransfn, Oid aggtranstype,
Oid aggserialfn, Oid aggdeserialfn,
Datum initValue, bool initValueIsNull,
......@@ -3105,6 +3111,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
AclResult aclresult;
Oid transfn_oid,
finalfn_oid;
bool sharable;
Oid serialfn_oid,
deserialfn_oid;
Expr *finalfnexpr;
......@@ -3177,6 +3184,15 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
else
peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
/*
* If finalfn is marked read-write, we can't share transition states;
* but it is okay to share states for AGGMODIFY_SHARABLE aggs. Also,
* if we're not executing the finalfn here, we can share regardless.
*/
sharable = (aggform->aggfinalmodify != AGGMODIFY_READ_WRITE) ||
(finalfn_oid == InvalidOid);
peragg->sharable = sharable;
serialfn_oid = InvalidOid;
deserialfn_oid = InvalidOid;
......@@ -3315,11 +3331,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* 2. Build working state for invoking the transition function, or
* look up previously initialized working state, if we can share it.
*
* find_compatible_peragg() already collected a list of per-Trans's
* with the same inputs. Check if any of them have the same transition
* function and initial value.
* find_compatible_peragg() already collected a list of sharable
* per-Trans's with the same inputs. Check if any of them have the
* same transition function and initial value.
*/
existing_transno = find_compatible_pertrans(aggstate, aggref,
sharable,
transfn_oid, aggtranstype,
serialfn_oid, deserialfn_oid,
initValue, initValueIsNull,
......@@ -3724,10 +3741,10 @@ GetAggInitVal(Datum textInitVal, Oid transtype)
* with this one, with the same input parameters. If no compatible aggregate
* can be found, returns -1.
*
* As a side-effect, this also collects a list of existing per-Trans structs
* with matching inputs. If no identical Aggref is found, the list is passed
* later to find_compatible_pertrans, to see if we can at least reuse the
* state value of another aggregate.
* As a side-effect, this also collects a list of existing, sharable per-Trans
* structs with matching inputs. If no identical Aggref is found, the list is
* passed later to find_compatible_pertrans, to see if we can at least reuse
* the state value of another aggregate.
*/
static int
find_compatible_peragg(Aggref *newagg, AggState *aggstate,
......@@ -3785,9 +3802,13 @@ find_compatible_peragg(Aggref *newagg, AggState *aggstate,
}
/*
* Not identical, but it had the same inputs. Return it to the caller,
* in case we can re-use its per-trans state.
* Not identical, but it had the same inputs. If the final function
* permits sharing, return its transno to the caller, in case we can
* re-use its per-trans state. (If there's already sharing going on,
* we might report a transno more than once. find_compatible_pertrans
* is cheap enough that it's not worth spending cycles to avoid that.)
*/
if (peragg->sharable)
*same_input_transnos = lappend_int(*same_input_transnos,
peragg->transno);
}
......@@ -3804,7 +3825,7 @@ find_compatible_peragg(Aggref *newagg, AggState *aggstate,
* verified to match.)
*/
static int
find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
find_compatible_pertrans(AggState *aggstate, Aggref *newagg, bool sharable,
Oid aggtransfn, Oid aggtranstype,
Oid aggserialfn, Oid aggdeserialfn,
Datum initValue, bool initValueIsNull,
......@@ -3812,14 +3833,8 @@ find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
{
ListCell *lc;
/*
* For the moment, never try to share transition states between different
* ordered-set aggregates. This is necessary because the finalfns of the
* built-in OSAs (see orderedsetaggs.c) are destructive of their
* transition states. We should fix them so we can allow this, but not
* losing performance in the normal non-shared case will take some work.
*/
if (AGGKIND_IS_ORDERED_SET(newagg->aggkind))
/* If this aggregate can't share transition states, give up */
if (!sharable)
return -1;
foreach(lc, transnos)
......
......@@ -49,6 +49,7 @@
#include "utils/datum.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/regproc.h"
#include "utils/syscache.h"
#include "windowapi.h"
......@@ -2096,10 +2097,12 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
Oid aggtranstype;
AttrNumber initvalAttNo;
AclResult aclresult;
bool use_ma_code;
Oid transfn_oid,
invtransfn_oid,
finalfn_oid;
bool finalextra;
char finalmodify;
Expr *transfnexpr,
*invtransfnexpr,
*finalfnexpr;
......@@ -2125,20 +2128,32 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
* Figure out whether we want to use the moving-aggregate implementation,
* and collect the right set of fields from the pg_attribute entry.
*
* If the frame head can't move, we don't need moving-aggregate code. Even
* if we'd like to use it, don't do so if the aggregate's arguments (and
* FILTER clause if any) contain any calls to volatile functions.
* Otherwise, the difference between restarting and not restarting the
* aggregation would be user-visible.
*/
if (OidIsValid(aggform->aggminvtransfn) &&
!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) &&
!contain_volatile_functions((Node *) wfunc))
* It's possible that an aggregate would supply a safe moving-aggregate
* implementation and an unsafe normal one, in which case our hand is
* forced. Otherwise, if the frame head can't move, we don't need
* moving-aggregate code. Even if we'd like to use it, don't do so if the
* aggregate's arguments (and FILTER clause if any) contain any calls to
* volatile functions. Otherwise, the difference between restarting and
* not restarting the aggregation would be user-visible.
*/
if (!OidIsValid(aggform->aggminvtransfn))
use_ma_code = false; /* sine qua non */
else if (aggform->aggmfinalmodify == AGGMODIFY_READ_ONLY &&
aggform->aggfinalmodify != AGGMODIFY_READ_ONLY)
use_ma_code = true; /* decision forced by safety */
else if (winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
use_ma_code = false; /* non-moving frame head */
else if (contain_volatile_functions((Node *) wfunc))
use_ma_code = false; /* avoid possible behavioral change */
else
use_ma_code = true; /* yes, let's use it */
if (use_ma_code)
{
peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
finalextra = aggform->aggmfinalextra;
finalmodify = aggform->aggmfinalmodify;
aggtranstype = aggform->aggmtranstype;
initvalAttNo = Anum_pg_aggregate_aggminitval;
}
......@@ -2148,6 +2163,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
finalextra = aggform->aggfinalextra;
finalmodify = aggform->aggfinalmodify;
aggtranstype = aggform->aggtranstype;
initvalAttNo = Anum_pg_aggregate_agginitval;
}
......@@ -2198,6 +2214,17 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
}
}
/*
* If the selected finalfn isn't read-only, we can't run this aggregate as
* a window function. This is a user-facing error, so we take a bit more
* care with the error message than elsewhere in this function.
*/
if (finalmodify != AGGMODIFY_READ_ONLY)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("aggregate function %s does not support use as a window function",
format_procedure(wfunc->winfnoid))));
/* Detect how many arguments to pass to the finalfn */
if (finalextra)
peraggstate->numFinalArgs = numArguments + 1;
......
......@@ -42,6 +42,7 @@
#include "access/attnum.h"
#include "access/sysattr.h"
#include "access/transam.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_am.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_cast.h"
......@@ -13433,8 +13434,10 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
int i_aggmfinalfn;
int i_aggfinalextra;
int i_aggmfinalextra;
int i_aggfinalmodify;
int i_aggmfinalmodify;
int i_aggsortop;
int i_hypothetical;
int i_aggkind;
int i_aggtranstype;
int i_aggtransspace;
int i_aggmtranstype;
......@@ -13453,9 +13456,11 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
const char *aggmfinalfn;
bool aggfinalextra;
bool aggmfinalextra;
char aggfinalmodify;
char aggmfinalmodify;
const char *aggsortop;
char *aggsortconvop;
bool hypothetical;
char aggkind;
const char *aggtranstype;
const char *aggtransspace;
const char *aggmtranstype;
......@@ -13464,6 +13469,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
const char *aggminitval;
bool convertok;
const char *proparallel;
char defaultfinalmodify;
/* Skip if not to be dumped */
if (!agginfo->aggfn.dobj.dump || dopt->dataOnly)
......@@ -13479,15 +13485,37 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
selectSourceSchema(fout, agginfo->aggfn.dobj.namespace->dobj.name);
/* Get aggregate-specific details */
if (fout->remoteVersion >= 90600)
if (fout->remoteVersion >= 110000)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"aggcombinefn, aggserialfn, aggdeserialfn, aggmtransfn, "
"aggminvtransfn, aggmfinalfn, aggmtranstype::pg_catalog.regtype, "
"aggfinalextra, aggmfinalextra, "
"aggfinalmodify, aggmfinalmodify, "
"aggsortop::pg_catalog.regoperator, "
"(aggkind = 'h') AS hypothetical, "
"aggkind, "
"aggtransspace, agginitval, "
"aggmtransspace, aggminitval, "
"true AS convertok, "
"pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, "
"pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs, "
"p.proparallel "
"FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
"WHERE a.aggfnoid = p.oid "
"AND p.oid = '%u'::pg_catalog.oid",
agginfo->aggfn.dobj.catId.oid);
}
else if (fout->remoteVersion >= 90600)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"aggcombinefn, aggserialfn, aggdeserialfn, aggmtransfn, "
"aggminvtransfn, aggmfinalfn, aggmtranstype::pg_catalog.regtype, "
"aggfinalextra, aggmfinalextra, "
"'0' AS aggfinalmodify, '0' AS aggmfinalmodify, "
"aggsortop::pg_catalog.regoperator, "
"aggkind, "
"aggtransspace, agginitval, "
"aggmtransspace, aggminitval, "
"true AS convertok, "
......@@ -13507,8 +13535,9 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
"'-' AS aggdeserialfn, aggmtransfn, aggminvtransfn, "
"aggmfinalfn, aggmtranstype::pg_catalog.regtype, "
"aggfinalextra, aggmfinalextra, "
"'0' AS aggfinalmodify, '0' AS aggmfinalmodify, "
"aggsortop::pg_catalog.regoperator, "
"(aggkind = 'h') AS hypothetical, "
"aggkind, "
"aggtransspace, agginitval, "
"aggmtransspace, aggminitval, "
"true AS convertok, "
......@@ -13528,8 +13557,9 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
"'-' AS aggminvtransfn, '-' AS aggmfinalfn, "
"0 AS aggmtranstype, false AS aggfinalextra, "
"false AS aggmfinalextra, "
"'0' AS aggfinalmodify, '0' AS aggmfinalmodify, "
"aggsortop::pg_catalog.regoperator, "
"false AS hypothetical, "
"'n' AS aggkind, "
"0 AS aggtransspace, agginitval, "
"0 AS aggmtransspace, NULL AS aggminitval, "
"true AS convertok, "
......@@ -13549,8 +13579,9 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
"'-' AS aggminvtransfn, '-' AS aggmfinalfn, "
"0 AS aggmtranstype, false AS aggfinalextra, "
"false AS aggmfinalextra, "
"'0' AS aggfinalmodify, '0' AS aggmfinalmodify, "
"aggsortop::pg_catalog.regoperator, "
"false AS hypothetical, "
"'n' AS aggkind, "
"0 AS aggtransspace, agginitval, "
"0 AS aggmtransspace, NULL AS aggminitval, "
"true AS convertok "
......@@ -13567,8 +13598,10 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
"'-' AS aggdeserialfn, '-' AS aggmtransfn, "
"'-' AS aggminvtransfn, '-' AS aggmfinalfn, "
"0 AS aggmtranstype, false AS aggfinalextra, "
"false AS aggmfinalextra, 0 AS aggsortop, "
"false AS hypothetical, "
"false AS aggmfinalextra, "
"'0' AS aggfinalmodify, '0' AS aggmfinalmodify, "
"0 AS aggsortop, "
"'n' AS aggkind, "
"0 AS aggtransspace, agginitval, "
"0 AS aggmtransspace, NULL AS aggminitval, "
"true AS convertok "
......@@ -13590,8 +13623,10 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
i_aggmfinalfn = PQfnumber(res, "aggmfinalfn");
i_aggfinalextra = PQfnumber(res, "aggfinalextra");
i_aggmfinalextra = PQfnumber(res, "aggmfinalextra");
i_aggfinalmodify = PQfnumber(res, "aggfinalmodify");
i_aggmfinalmodify = PQfnumber(res, "aggmfinalmodify");
i_aggsortop = PQfnumber(res, "aggsortop");
i_hypothetical = PQfnumber(res, "hypothetical");
i_aggkind = PQfnumber(res, "aggkind");
i_aggtranstype = PQfnumber(res, "aggtranstype");
i_aggtransspace = PQfnumber(res, "aggtransspace");
i_aggmtranstype = PQfnumber(res, "aggmtranstype");
......@@ -13611,8 +13646,10 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
aggmfinalfn = PQgetvalue(res, 0, i_aggmfinalfn);
aggfinalextra = (PQgetvalue(res, 0, i_aggfinalextra)[0] == 't');
aggmfinalextra = (PQgetvalue(res, 0, i_aggmfinalextra)[0] == 't');
aggfinalmodify = PQgetvalue(res, 0, i_aggfinalmodify)[0];
aggmfinalmodify = PQgetvalue(res, 0, i_aggmfinalmodify)[0];
aggsortop = PQgetvalue(res, 0, i_aggsortop);
hypothetical = (PQgetvalue(res, 0, i_hypothetical)[0] == 't');
aggkind = PQgetvalue(res, 0, i_aggkind)[0];
aggtranstype = PQgetvalue(res, 0, i_aggtranstype);
aggtransspace = PQgetvalue(res, 0, i_aggtransspace);
aggmtranstype = PQgetvalue(res, 0, i_aggmtranstype);
......@@ -13656,6 +13693,14 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
return;
}
/* identify default modify flag for aggkind (must match DefineAggregate) */
defaultfinalmodify = (aggkind == AGGKIND_NORMAL) ? AGGMODIFY_READ_ONLY : AGGMODIFY_READ_WRITE;
/* replace omitted flags for old versions */
if (aggfinalmodify == '0')
aggfinalmodify = defaultfinalmodify;
if (aggmfinalmodify == '0')
aggmfinalmodify = defaultfinalmodify;
/* regproc and regtype output is already sufficiently quoted */
appendPQExpBuffer(details, " SFUNC = %s,\n STYPE = %s",
aggtransfn, aggtranstype);
......@@ -13678,6 +13723,25 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
aggfinalfn);
if (aggfinalextra)
appendPQExpBufferStr(details, ",\n FINALFUNC_EXTRA");
if (aggfinalmodify != defaultfinalmodify)
{
switch (aggfinalmodify)
{
case AGGMODIFY_READ_ONLY:
appendPQExpBufferStr(details, ",\n FINALFUNC_MODIFY = READ_ONLY");
break;
case AGGMODIFY_SHARABLE:
appendPQExpBufferStr(details, ",\n FINALFUNC_MODIFY = SHARABLE");
break;
case AGGMODIFY_READ_WRITE:
appendPQExpBufferStr(details, ",\n FINALFUNC_MODIFY = READ_WRITE");
break;
default:
exit_horribly(NULL, "unrecognized aggfinalmodify value for aggregate \"%s\"\n",
agginfo->aggfn.dobj.name);
break;
}
}
}
if (strcmp(aggcombinefn, "-") != 0)
......@@ -13715,6 +13779,25 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
aggmfinalfn);
if (aggmfinalextra)
appendPQExpBufferStr(details, ",\n MFINALFUNC_EXTRA");
if (aggmfinalmodify != defaultfinalmodify)
{
switch (aggmfinalmodify)
{
case AGGMODIFY_READ_ONLY:
appendPQExpBufferStr(details, ",\n MFINALFUNC_MODIFY = READ_ONLY");
break;
case AGGMODIFY_SHARABLE:
appendPQExpBufferStr(details, ",\n MFINALFUNC_MODIFY = SHARABLE");
break;
case AGGMODIFY_READ_WRITE:
appendPQExpBufferStr(details, ",\n MFINALFUNC_MODIFY = READ_WRITE");
break;
default:
exit_horribly(NULL, "unrecognized aggmfinalmodify value for aggregate \"%s\"\n",
agginfo->aggfn.dobj.name);
break;
}
}
}
aggsortconvop = convertOperatorReference(fout, aggsortop);
......@@ -13725,7 +13808,7 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
free(aggsortconvop);
}
if (hypothetical)
if (aggkind == AGGKIND_HYPOTHETICAL)
appendPQExpBufferStr(details, ",\n HYPOTHETICAL");
if (proparallel != NULL && proparallel[0] != PROPARALLEL_UNSAFE)
......
......@@ -2784,6 +2784,7 @@ qr/CREATE CAST \(timestamp with time zone AS interval\) WITH FUNCTION pg_catalog
basetype = int4,
stype = _int8,
finalfunc = int8_avg,
finalfunc_modify = sharable,
initcond1 = \'{0,0}\'
);',
regexp => qr/^
......@@ -2791,7 +2792,8 @@ qr/CREATE CAST \(timestamp with time zone AS interval\) WITH FUNCTION pg_catalog
\n\s+\QSFUNC = int4_avg_accum,\E
\n\s+\QSTYPE = bigint[],\E
\n\s+\QINITCOND = '{0,0}',\E
\n\s+\QFINALFUNC = int8_avg\E
\n\s+\QFINALFUNC = int8_avg,\E
\n\s+\QFINALFUNC_MODIFY = SHARABLE\E
\n\);/xm,
like => {
binary_upgrade => 1,
......
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201709301
#define CATALOG_VERSION_NO 201710141
#endif
This diff is collapsed.
/*-------------------------------------------------------------------------
*
* pg_aggregate_fn.h
* prototypes for functions in catalog/pg_aggregate.c
*
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/catalog/pg_aggregate_fn.h
*
*-------------------------------------------------------------------------
*/
#ifndef PG_AGGREGATE_FN_H
#define PG_AGGREGATE_FN_H
#include "catalog/objectaddress.h"
#include "nodes/pg_list.h"
extern ObjectAddress AggregateCreate(const char *aggName,
Oid aggNamespace,
char aggKind,
int numArgs,
int numDirectArgs,
oidvector *parameterTypes,
Datum allParameterTypes,
Datum parameterModes,
Datum parameterNames,
List *parameterDefaults,
Oid variadicArgType,
List *aggtransfnName,
List *aggfinalfnName,
List *aggcombinefnName,
List *aggserialfnName,
List *aggdeserialfnName,
List *aggmtransfnName,
List *aggminvtransfnName,
List *aggmfinalfnName,
bool finalfnExtraArgs,
bool mfinalfnExtraArgs,
char finalfnModify,
char mfinalfnModify,
List *aggsortopName,
Oid aggTransType,
int32 aggTransSpace,
Oid aggmTransType,
int32 aggmTransSpace,
const char *agginitval,
const char *aggminitval,
char proparallel);
#endif /* PG_AGGREGATE_FN_H */
......@@ -71,7 +71,8 @@ create aggregate my_percentile_disc(float8 ORDER BY anyelement) (
stype = internal,
sfunc = ordered_set_transition,
finalfunc = percentile_disc_final,
finalfunc_extra = true
finalfunc_extra = true,
finalfunc_modify = read_write
);
create aggregate my_rank(VARIADIC "any" ORDER BY VARIADIC "any") (
stype = internal,
......@@ -146,15 +147,17 @@ CREATE AGGREGATE myavg (numeric)
finalfunc = numeric_avg,
serialfunc = numeric_avg_serialize,
deserialfunc = numeric_avg_deserialize,
combinefunc = numeric_avg_combine
combinefunc = numeric_avg_combine,
finalfunc_modify = sharable -- just to test a non-default setting
);
-- Ensure all these functions made it into the catalog
SELECT aggfnoid,aggtransfn,aggcombinefn,aggtranstype,aggserialfn,aggdeserialfn
SELECT aggfnoid, aggtransfn, aggcombinefn, aggtranstype::regtype,
aggserialfn, aggdeserialfn, aggfinalmodify
FROM pg_aggregate
WHERE aggfnoid = 'myavg'::REGPROC;
aggfnoid | aggtransfn | aggcombinefn | aggtranstype | aggserialfn | aggdeserialfn
----------+-------------------+---------------------+--------------+-----------------------+-------------------------
myavg | numeric_avg_accum | numeric_avg_combine | 2281 | numeric_avg_serialize | numeric_avg_deserialize
aggfnoid | aggtransfn | aggcombinefn | aggtranstype | aggserialfn | aggdeserialfn | aggfinalmodify
----------+-------------------+---------------------+--------------+-----------------------+-------------------------+----------------
myavg | numeric_avg_accum | numeric_avg_combine | internal | numeric_avg_serialize | numeric_avg_deserialize | s
(1 row)
DROP AGGREGATE myavg (numeric);
......
......@@ -1275,6 +1275,8 @@ WHERE aggfnoid = 0 OR aggtransfn = 0 OR
aggkind NOT IN ('n', 'o', 'h') OR
aggnumdirectargs < 0 OR
(aggkind = 'n' AND aggnumdirectargs > 0) OR
aggfinalmodify NOT IN ('r', 's', 'w') OR
aggmfinalmodify NOT IN ('r', 's', 'w') OR
aggtranstype = 0 OR aggtransspace < 0 OR aggmtransspace < 0;
ctid | aggfnoid
------+----------
......
......@@ -86,7 +86,8 @@ create aggregate my_percentile_disc(float8 ORDER BY anyelement) (
stype = internal,
sfunc = ordered_set_transition,
finalfunc = percentile_disc_final,
finalfunc_extra = true
finalfunc_extra = true,
finalfunc_modify = read_write
);
create aggregate my_rank(VARIADIC "any" ORDER BY VARIADIC "any") (
......@@ -161,11 +162,13 @@ CREATE AGGREGATE myavg (numeric)
finalfunc = numeric_avg,
serialfunc = numeric_avg_serialize,
deserialfunc = numeric_avg_deserialize,
combinefunc = numeric_avg_combine
combinefunc = numeric_avg_combine,
finalfunc_modify = sharable -- just to test a non-default setting
);
-- Ensure all these functions made it into the catalog
SELECT aggfnoid,aggtransfn,aggcombinefn,aggtranstype,aggserialfn,aggdeserialfn
SELECT aggfnoid, aggtransfn, aggcombinefn, aggtranstype::regtype,
aggserialfn, aggdeserialfn, aggfinalmodify
FROM pg_aggregate
WHERE aggfnoid = 'myavg'::REGPROC;
......
......@@ -795,6 +795,8 @@ WHERE aggfnoid = 0 OR aggtransfn = 0 OR
aggkind NOT IN ('n', 'o', 'h') OR
aggnumdirectargs < 0 OR
(aggkind = 'n' AND aggnumdirectargs > 0) OR
aggfinalmodify NOT IN ('r', 's', 'w') OR
aggmfinalmodify NOT IN ('r', 's', 'w') OR
aggtranstype = 0 OR aggtransspace < 0 OR aggmtransspace < 0;
-- Make sure the matching pg_proc entry is sensible, too.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment