Commit be0ebb65 authored by Tom Lane's avatar Tom Lane

Allow the built-in ordered-set aggregates to share transition state.

The built-in OSAs all share the same transition function, so they can
share transition state as long as the final functions cooperate to not
do the sort step more than once.  To avoid running the tuplesort object
in randomAccess mode unnecessarily, add a bit of infrastructure to
nodeAgg.c to let the aggregate functions find out whether the transition
state is actually being shared or not.

This doesn't work for the hypothetical aggregates, since those inject
a hypothetical row that isn't traceable to the shared input state.
So they remain marked aggfinalmodify = 'w'.

Discussion: https://postgr.es/m/CAB4ELO5RZhOamuT9Xsf72ozbenDLLXZKSk07FiSVsuJNZB861A@mail.gmail.com
parent c3dfe0fe
......@@ -254,6 +254,11 @@ typedef struct AggStatePerTransData
*/
Aggref *aggref;
/*
* Is this state value actually being shared by more than one Aggref?
*/
bool aggshared;
/*
* Nominal number of arguments for aggregate function. For plain aggs,
* this excludes any ORDER BY expressions. For ordered-set aggs, this
......@@ -3360,9 +3365,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
{
/*
* Existing compatible trans found, so just point the 'peragg' to
* the same per-trans struct.
* the same per-trans struct, and mark the trans state as shared.
*/
pertrans = &pertransstates[existing_transno];
pertrans->aggshared = true;
peragg->transno = existing_transno;
}
else
......@@ -3512,6 +3518,7 @@ build_pertrans_for_aggref(AggStatePerTrans pertrans,
/* Begin filling in the pertrans data */
pertrans->aggref = aggref;
pertrans->aggshared = false;
pertrans->aggCollation = aggref->inputcollid;
pertrans->transfn_oid = aggtransfn;
pertrans->serialfn_oid = aggserialfn;
......@@ -4161,17 +4168,18 @@ AggGetAggref(FunctionCallInfo fcinfo)
{
if (fcinfo->context && IsA(fcinfo->context, AggState))
{
AggState *aggstate = (AggState *) fcinfo->context;
AggStatePerAgg curperagg;
AggStatePerTrans curpertrans;
/* check curperagg (valid when in a final function) */
curperagg = ((AggState *) fcinfo->context)->curperagg;
curperagg = aggstate->curperagg;
if (curperagg)
return curperagg->aggref;
/* check curpertrans (valid when in a transition function) */
curpertrans = ((AggState *) fcinfo->context)->curpertrans;
curpertrans = aggstate->curpertrans;
if (curpertrans)
return curpertrans->aggref;
......@@ -4201,6 +4209,44 @@ AggGetTempMemoryContext(FunctionCallInfo fcinfo)
return NULL;
}
/*
* AggStateIsShared - find out whether transition state is shared
*
* If the function is being called as an aggregate support function,
* return TRUE if the aggregate's transition state is shared across
* multiple aggregates, FALSE if it is not.
*
* Returns TRUE if not called as an aggregate support function.
* This is intended as a conservative answer, ie "no you'd better not
* scribble on your input". In particular, will return TRUE if the
* aggregate is being used as a window function, which is a scenario
* in which changing the transition state is a bad idea. We might
* want to refine the behavior for the window case in future.
*/
bool
AggStateIsShared(FunctionCallInfo fcinfo)
{
if (fcinfo->context && IsA(fcinfo->context, AggState))
{
AggState *aggstate = (AggState *) fcinfo->context;
AggStatePerAgg curperagg;
AggStatePerTrans curpertrans;
/* check curperagg (valid when in a final function) */
curperagg = aggstate->curperagg;
if (curperagg)
return aggstate->pertrans[curperagg->transno].aggshared;
/* check curpertrans (valid when in a transition function) */
curpertrans = aggstate->curpertrans;
if (curpertrans)
return curpertrans->aggshared;
}
return true;
}
/*
* AggRegisterCallback - register a cleanup callback for an aggregate
*
......
......@@ -40,14 +40,22 @@
* create just once per query because they will not change across groups.
* The per-query struct and subsidiary data live in the executor's per-query
* memory context, and go away implicitly at ExecutorEnd().
*
* These structs are set up during the first call of the transition function.
* Because we allow nodeAgg.c to merge ordered-set aggregates (but not
* hypothetical aggregates) with identical inputs and transition functions,
* this info must not depend on the particular aggregate (ie, particular
* final-function), nor on the direct argument(s) of the aggregate.
*/
typedef struct OSAPerQueryState
{
/* Aggref for this aggregate: */
/* Representative Aggref for this aggregate: */
Aggref *aggref;
/* Memory context containing this struct and other per-query data: */
MemoryContext qcontext;
/* Do we expect multiple final-function calls within one group? */
bool rescan_needed;
/* These fields are used only when accumulating tuples: */
......@@ -91,6 +99,8 @@ typedef struct OSAPerGroupState
Tuplesortstate *sortstate;
/* Number of normal rows inserted into sortstate: */
int64 number_of_rows;
/* Have we already done tuplesort_performsort? */
bool sort_done;
} OSAPerGroupState;
static void ordered_set_shutdown(Datum arg);
......@@ -146,6 +156,9 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
qstate->aggref = aggref;
qstate->qcontext = qcontext;
/* We need to support rescans if the trans state is shared */
qstate->rescan_needed = AggStateIsShared(fcinfo);
/* Extract the sort information */
sortlist = aggref->aggorder;
numSortCols = list_length(sortlist);
......@@ -277,15 +290,18 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
qstate->sortOperators,
qstate->sortCollations,
qstate->sortNullsFirsts,
work_mem, false);
work_mem,
qstate->rescan_needed);
else
osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
qstate->sortOperator,
qstate->sortCollation,
qstate->sortNullsFirst,
work_mem, false);
work_mem,
qstate->rescan_needed);
osastate->number_of_rows = 0;
osastate->sort_done = false;
/* Now register a shutdown callback to clean things up at end of group */
AggRegisterCallback(fcinfo,
......@@ -306,14 +322,12 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
* group) by ExecutorEnd. But we must take care to release any potential
* non-memory resources.
*
* This callback is arguably unnecessary, since we don't support use of
* ordered-set aggs in AGG_HASHED mode and there is currently no non-error
* code path in non-hashed modes wherein nodeAgg.c won't call the finalfn
* after calling the transfn one or more times. So in principle we could rely
* on the finalfn to delete the tuplestore etc. However, it's possible that
* such a code path might exist in future, and in any case it'd be
* notationally tedious and sometimes require extra data copying to ensure
* we always delete the tuplestore in the finalfn.
* In the case where we're not expecting multiple finalfn calls, we could
* arguably rely on the finalfn to clean up; but it's easier and more testable
* if we just do it the same way in either case. Note that many of the
* finalfns could *not* free the tuplesort object, at least not without extra
* data copying, because what they return is a pointer to a datum inside the
* tuplesort object.
*/
static void
ordered_set_shutdown(Datum arg)
......@@ -436,8 +450,14 @@ percentile_disc_final(PG_FUNCTION_ARGS)
if (osastate->number_of_rows == 0)
PG_RETURN_NULL();
/* Finish the sort */
/* Finish the sort, or rescan if we already did */
if (!osastate->sort_done)
{
tuplesort_performsort(osastate->sortstate);
osastate->sort_done = true;
}
else
tuplesort_rescan(osastate->sortstate);
/*----------
* We need the smallest K such that (K/N) >= percentile.
......@@ -457,13 +477,6 @@ percentile_disc_final(PG_FUNCTION_ARGS)
if (!tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, NULL))
elog(ERROR, "missing row in percentile_disc");
/*
* Note: we *cannot* clean up the tuplesort object here, because the value
* to be returned is allocated inside its sortcontext. We could use
* datumCopy to copy it out of there, but it doesn't seem worth the
* trouble, since the cleanup callback will clear the tuplesort later.
*/
/* We shouldn't have stored any nulls, but do the right thing anyway */
if (isnull)
PG_RETURN_NULL();
......@@ -543,8 +556,14 @@ percentile_cont_final_common(FunctionCallInfo fcinfo,
Assert(expect_type == osastate->qstate->sortColType);
/* Finish the sort */
/* Finish the sort, or rescan if we already did */
if (!osastate->sort_done)
{
tuplesort_performsort(osastate->sortstate);
osastate->sort_done = true;
}
else
tuplesort_rescan(osastate->sortstate);
first_row = floor(percentile * (osastate->number_of_rows - 1));
second_row = ceil(percentile * (osastate->number_of_rows - 1));
......@@ -575,13 +594,6 @@ percentile_cont_final_common(FunctionCallInfo fcinfo,
val = lerpfunc(first_val, second_val, proportion);
}
/*
* Note: we *cannot* clean up the tuplesort object here, because the value
* to be returned may be allocated inside its sortcontext. We could use
* datumCopy to copy it out of there, but it doesn't seem worth the
* trouble, since the cleanup callback will clear the tuplesort later.
*/
PG_RETURN_DATUM(val);
}
......@@ -779,8 +791,14 @@ percentile_disc_multi_final(PG_FUNCTION_ARGS)
*/
if (i < num_percentiles)
{
/* Finish the sort */
/* Finish the sort, or rescan if we already did */
if (!osastate->sort_done)
{
tuplesort_performsort(osastate->sortstate);
osastate->sort_done = true;
}
else
tuplesort_rescan(osastate->sortstate);
for (; i < num_percentiles; i++)
{
......@@ -804,11 +822,6 @@ percentile_disc_multi_final(PG_FUNCTION_ARGS)
}
}
/*
* We could clean up the tuplesort object after forming the array, but
* probably not worth the trouble.
*/
/* We make the output array the same shape as the input */
PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull,
ARR_NDIM(param),
......@@ -902,8 +915,14 @@ percentile_cont_multi_final_common(FunctionCallInfo fcinfo,
*/
if (i < num_percentiles)
{
/* Finish the sort */
/* Finish the sort, or rescan if we already did */
if (!osastate->sort_done)
{
tuplesort_performsort(osastate->sortstate);
osastate->sort_done = true;
}
else
tuplesort_rescan(osastate->sortstate);
for (; i < num_percentiles; i++)
{
......@@ -962,11 +981,6 @@ percentile_cont_multi_final_common(FunctionCallInfo fcinfo,
}
}
/*
* We could clean up the tuplesort object after forming the array, but
* probably not worth the trouble.
*/
/* We make the output array the same shape as the input */
PG_RETURN_POINTER(construct_md_array(result_datum, result_isnull,
ARR_NDIM(param),
......@@ -1043,8 +1057,14 @@ mode_final(PG_FUNCTION_ARGS)
shouldfree = !(osastate->qstate->typByVal);
/* Finish the sort */
/* Finish the sort, or rescan if we already did */
if (!osastate->sort_done)
{
tuplesort_performsort(osastate->sortstate);
osastate->sort_done = true;
}
else
tuplesort_rescan(osastate->sortstate);
/* Scan tuples and count frequencies */
while (tuplesort_getdatum(osastate->sortstate, true, &val, &isnull, &abbrev_val))
......@@ -1097,13 +1117,6 @@ mode_final(PG_FUNCTION_ARGS)
if (shouldfree && !last_val_is_mode)
pfree(DatumGetPointer(last_val));
/*
* Note: we *cannot* clean up the tuplesort object here, because the value
* to be returned is allocated inside its sortcontext. We could use
* datumCopy to copy it out of there, but it doesn't seem worth the
* trouble, since the cleanup callback will clear the tuplesort later.
*/
if (mode_freq)
PG_RETURN_DATUM(mode_val);
else
......@@ -1174,6 +1187,9 @@ hypothetical_rank_common(FunctionCallInfo fcinfo, int flag,
hypothetical_check_argtypes(fcinfo, nargs, osastate->qstate->tupdesc);
/* because we need a hypothetical row, we can't share transition state */
Assert(!osastate->sort_done);
/* insert the hypothetical row into the sort */
slot = osastate->qstate->tupslot;
ExecClearTuple(slot);
......@@ -1190,6 +1206,7 @@ hypothetical_rank_common(FunctionCallInfo fcinfo, int flag,
/* finish the sort */
tuplesort_performsort(osastate->sortstate);
osastate->sort_done = true;
/* iterate till we find the hypothetical row */
while (tuplesort_gettupleslot(osastate->sortstate, true, true, slot, NULL))
......@@ -1207,10 +1224,6 @@ hypothetical_rank_common(FunctionCallInfo fcinfo, int flag,
ExecClearTuple(slot);
/* Might as well clean up the tuplesort object immediately */
tuplesort_end(osastate->sortstate);
osastate->sortstate = NULL;
return rank;
}
......@@ -1329,6 +1342,9 @@ hypothetical_dense_rank_final(PG_FUNCTION_ARGS)
/* Get short-term context we can use for execTuplesMatch */
tmpcontext = AggGetTempMemoryContext(fcinfo);
/* because we need a hypothetical row, we can't share transition state */
Assert(!osastate->sort_done);
/* insert the hypothetical row into the sort */
slot = osastate->qstate->tupslot;
ExecClearTuple(slot);
......@@ -1345,6 +1361,7 @@ hypothetical_dense_rank_final(PG_FUNCTION_ARGS)
/* finish the sort */
tuplesort_performsort(osastate->sortstate);
osastate->sort_done = true;
/*
* We alternate fetching into tupslot and extraslot so that we have the
......@@ -1391,10 +1408,6 @@ hypothetical_dense_rank_final(PG_FUNCTION_ARGS)
ExecDropSingleTupleTableSlot(extraslot);
/* Might as well clean up the tuplesort object immediately */
tuplesort_end(osastate->sortstate);
osastate->sortstate = NULL;
rank = rank - duplicate_count;
PG_RETURN_INT64(rank);
......
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201710141
#define CATALOG_VERSION_NO 201710161
#endif
......@@ -318,13 +318,13 @@ DATA(insert ( 3267 n 0 jsonb_agg_transfn jsonb_agg_finalfn - - - - - -
DATA(insert ( 3270 n 0 jsonb_object_agg_transfn jsonb_object_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ ));
/* ordered-set and hypothetical-set aggregates */
DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - - - - f f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - - - - t f s s 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - - - - t f s s 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - - - - f f s s 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - - - - t f s s 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - - - - t f w w 0 2281 0 0 0 _null_ _null_ ));
......
......@@ -698,6 +698,7 @@ extern int AggCheckCallContext(FunctionCallInfo fcinfo,
MemoryContext *aggcontext);
extern fmAggrefPtr AggGetAggref(FunctionCallInfo fcinfo);
extern MemoryContext AggGetTempMemoryContext(FunctionCallInfo fcinfo);
extern bool AggStateIsShared(FunctionCallInfo fcinfo);
extern void AggRegisterCallback(FunctionCallInfo fcinfo,
fmExprContextCallbackFunction func,
Datum arg);
......
......@@ -1866,7 +1866,7 @@ NOTICE: avg_transfn called with 3
2 | 6
(1 row)
-- ideally these would share state, but we have to fix the OSAs first.
-- exercise cases where OSAs share state
select
percentile_cont(0.5) within group (order by a),
percentile_disc(0.5) within group (order by a)
......@@ -1876,6 +1876,16 @@ from (values(1::float8),(3),(5),(7)) t(a);
4 | 3
(1 row)
select
percentile_cont(0.25) within group (order by a),
percentile_disc(0.5) within group (order by a)
from (values(1::float8),(3),(5),(7)) t(a);
percentile_cont | percentile_disc
-----------------+-----------------
2.5 | 3
(1 row)
-- these can't share state currently
select
rank(4) within group (order by a),
dense_rank(4) within group (order by a)
......
......@@ -741,12 +741,18 @@ select my_avg(one) filter (where one > 1),my_sum(one) from (values(1),(3)) t(one
-- this should not share the state due to different input columns.
select my_avg(one),my_sum(two) from (values(1,2),(3,4)) t(one,two);
-- ideally these would share state, but we have to fix the OSAs first.
-- exercise cases where OSAs share state
select
percentile_cont(0.5) within group (order by a),
percentile_disc(0.5) within group (order by a)
from (values(1::float8),(3),(5),(7)) t(a);
select
percentile_cont(0.25) within group (order by a),
percentile_disc(0.5) within group (order by a)
from (values(1::float8),(3),(5),(7)) t(a);
-- these can't share state currently
select
rank(4) within group (order by a),
dense_rank(4) within group (order by a)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment