nodeAgg.c 148 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * nodeAgg.c
4
 *	  Routines to handle aggregate nodes.
5
 *
6
 *	  ExecAgg normally evaluates each aggregate in the following steps:
7
 *
8
 *		 transvalue = initcond
9 10
 *		 foreach input_tuple do
 *			transvalue = transfunc(transvalue, input_value(s))
11
 *		 result = finalfunc(transvalue, direct_argument(s))
12
 *
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
 *	  If a finalfunc is not supplied then the result is just the ending
 *	  value of transvalue.
 *
 *	  Other behaviors can be selected by the "aggsplit" mode, which exists
 *	  to support partial aggregation.  It is possible to:
 *	  * Skip running the finalfunc, so that the output is always the
 *	  final transvalue state.
 *	  * Substitute the combinefunc for the transfunc, so that transvalue
 *	  states (propagated up from a child partial-aggregation step) are merged
 *	  rather than processing raw input rows.  (The statements below about
 *	  the transfunc apply equally to the combinefunc, when it's selected.)
 *	  * Apply the serializefunc to the output values (this only makes sense
 *	  when skipping the finalfunc, since the serializefunc works on the
 *	  transvalue data type).
 *	  * Apply the deserializefunc to the input values (this only makes sense
 *	  when using the combinefunc, for similar reasons).
 *	  It is the planner's responsibility to connect up Agg nodes using these
 *	  alternate behaviors in a way that makes sense, with partial aggregation
 *	  results being fed to nodes that expect them.
32
 *
33 34 35 36
 *	  If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
 *	  input tuples and eliminate duplicates (if required) before performing
 *	  the above-depicted process.  (However, we don't do that for ordered-set
 *	  aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
37 38 39
 *	  so far as this module is concerned.)	Note that partial aggregation
 *	  is not supported in these cases, since we couldn't ensure global
 *	  ordering or distinctness of the inputs.
40
 *
41 42 43
 *	  If transfunc is marked "strict" in pg_proc and initcond is NULL,
 *	  then the first non-NULL input_value is assigned directly to transvalue,
 *	  and transfunc isn't applied until the second non-NULL input_value.
44
 *	  The agg's first input type and transtype must be the same in this case!
45 46
 *
 *	  If transfunc is marked "strict" then NULL input_values are skipped,
Bruce Momjian's avatar
Bruce Momjian committed
47
 *	  keeping the previous transvalue.  If transfunc is not strict then it
48
 *	  is called for every input tuple and must deal with NULL initcond
49
 *	  or NULL input_values for itself.
50 51 52 53 54 55
 *
 *	  If finalfunc is marked "strict" then it is not called when the
 *	  ending transvalue is NULL, instead a NULL result is created
 *	  automatically (this is just the usual handling of strict functions,
 *	  of course).  A non-strict finalfunc can make its own choice of
 *	  what to return for a NULL ending transvalue.
56
 *
57 58
 *	  Ordered-set aggregates are treated specially in one other way: we
 *	  evaluate any "direct" arguments and pass them to the finalfunc along
59 60 61 62 63 64
 *	  with the transition value.
 *
 *	  A finalfunc can have additional arguments beyond the transvalue and
 *	  any "direct" arguments, corresponding to the input arguments of the
 *	  aggregate.  These are always just passed as NULL.  Such arguments may be
 *	  needed to allow resolution of a polymorphic aggregate's result type.
65
 *
66
 *	  We compute aggregate input expressions and run the transition functions
67 68
 *	  in a temporary econtext (aggstate->tmpcontext).  This is reset at least
 *	  once per input tuple, so when the transvalue datatype is
69
 *	  pass-by-reference, we have to be careful to copy it into a longer-lived
70 71 72 73 74 75 76 77 78 79
 *	  memory context, and free the prior value to avoid memory leakage.  We
 *	  store transvalues in another set of econtexts, aggstate->aggcontexts
 *	  (one per grouping set, see below), which are also used for the hashtable
 *	  structures in AGG_HASHED mode.  These econtexts are rescanned, not just
 *	  reset, at group boundaries so that aggregate transition functions can
 *	  register shutdown callbacks via AggRegisterCallback.
 *
 *	  The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
 *	  run finalize functions and compute the output tuple; this context can be
 *	  reset once per output tuple.
80
 *
81 82 83 84 85 86 87 88 89 90 91
 *	  The executor's AggState node is passed as the fmgr "context" value in
 *	  all transfunc and finalfunc calls.  It is not recommended that the
 *	  transition functions look at the AggState node directly, but they can
 *	  use AggCheckCallContext() to verify that they are being called by
 *	  nodeAgg.c (and not as ordinary SQL functions).  The main reason a
 *	  transition function might want to know this is so that it can avoid
 *	  palloc'ing a fixed-size pass-by-ref transition value on every call:
 *	  it can instead just scribble on and return its left input.  Ordinarily
 *	  it is completely forbidden for functions to modify pass-by-ref inputs,
 *	  but in the aggregate case we know the left input is either the initial
 *	  transition value or a previous function result, and in either case its
Bruce Momjian's avatar
Bruce Momjian committed
92
 *	  value need not be preserved.  See int8inc() for an example.  Notice that
93
 *	  the EEOP_AGG_PLAIN_TRANS step is coded to avoid a data copy step when
94 95 96 97 98 99 100
 *	  the previous transition value pointer is returned.  It is also possible
 *	  to avoid repeated data copying when the transition value is an expanded
 *	  object: to do that, the transition function must take care to return
 *	  an expanded object that is in a child context of the memory context
 *	  returned by AggCheckCallContext().  Also, some transition functions want
 *	  to store working state in addition to the nominal transition value; they
 *	  can use the memory context returned by AggCheckCallContext() to do that.
101 102 103 104
 *
 *	  Note: AggCheckCallContext() is available as of PostgreSQL 9.0.  The
 *	  AggState is available as context in earlier releases (back to 8.1),
 *	  but direct examination of the node is needed to use it before 9.0.
105
 *
106 107 108 109 110 111 112
 *	  As of 9.4, aggregate transition functions can also use AggGetAggref()
 *	  to get hold of the Aggref expression node for their aggregate call.
 *	  This is mainly intended for ordered-set aggregates, which are not
 *	  supported as window functions.  (A regular aggregate function would
 *	  need some fallback logic to use this, since there's no Aggref node
 *	  for a window function.)
 *
113 114 115 116 117 118 119 120 121 122 123 124
 *	  Grouping sets:
 *
 *	  A list of grouping sets which is structurally equivalent to a ROLLUP
 *	  clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
 *	  ordered data.  We do this by keeping a separate set of transition values
 *	  for each grouping set being concurrently processed; for each input tuple
 *	  we update them all, and on group boundaries we reset those states
 *	  (starting at the front of the list) whose grouping values have changed
 *	  (the list of grouping sets is ordered from most specific to least
 *	  specific).
 *
 *	  Where more complex grouping sets are used, we break them down into
125 126 127 128 129 130 131 132 133 134 135 136 137
 *	  "phases", where each phase has a different sort order (except phase 0
 *	  which is reserved for hashing).  During each phase but the last, the
 *	  input tuples are additionally stored in a tuplesort which is keyed to the
 *	  next phase's sort order; during each phase but the first, the input
 *	  tuples are drawn from the previously sorted data.  (The sorting of the
 *	  data for the first phase is handled by the planner, as it might be
 *	  satisfied by underlying nodes.)
 *
 *	  Hashing can be mixed with sorted grouping.  To do this, we have an
 *	  AGG_MIXED strategy that populates the hashtables during the first sorted
 *	  phase, and switches to reading them out after completing all sort phases.
 *	  We can also support AGG_HASHED with multiple hash tables and no sorting
 *	  at all.
138 139 140 141 142 143 144 145 146 147 148
 *
 *	  From the perspective of aggregate transition and final functions, the
 *	  only issue regarding grouping sets is this: a single call site (flinfo)
 *	  of an aggregate function may be used for updating several different
 *	  transition values in turn. So the function must not cache in the flinfo
 *	  anything which logically belongs as part of the transition value (most
 *	  importantly, the memory context in which the transition value exists).
 *	  The support API functions (AggCheckCallContext, AggRegisterCallback) are
 *	  sensitive to the grouping set for which the aggregate function is
 *	  currently being called.
 *
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
 *	  Plan structure:
 *
 *	  What we get from the planner is actually one "real" Agg node which is
 *	  part of the plan tree proper, but which optionally has an additional list
 *	  of Agg nodes hung off the side via the "chain" field.  This is because an
 *	  Agg node happens to be a convenient representation of all the data we
 *	  need for grouping sets.
 *
 *	  For many purposes, we treat the "real" node as if it were just the first
 *	  node in the chain.  The chain must be ordered such that hashed entries
 *	  come before sorted/plain entries; the real node is marked AGG_MIXED if
 *	  there are both types present (in which case the real node describes one
 *	  of the hashed groupings, other AGG_HASHED nodes may optionally follow in
 *	  the chain, followed in turn by AGG_SORTED or (one) AGG_PLAIN node).  If
 *	  the real node is marked AGG_HASHED or AGG_SORTED, then all the chained
 *	  nodes must be of the same type; if it is AGG_PLAIN, there can be no
 *	  chained nodes.
 *
 *	  We collect all hashed nodes into a single "phase", numbered 0, and create
 *	  a sorted phase (numbered 1..n) for each AGG_SORTED or AGG_PLAIN node.
 *	  Phase 0 is allocated even if there are no hashes, but remains unused in
 *	  that case.
 *
 *	  AGG_HASHED nodes actually refer to only a single grouping set each,
 *	  because for each hashed grouping we need a separate grpColIdx and
 *	  numGroups estimate.  AGG_SORTED nodes represent a "rollup", a list of
 *	  grouping sets that share a sort order.  Each AGG_SORTED node other than
 *	  the first one has an associated Sort node which describes the sort order
 *	  to be used; the first sorted node takes its input from the outer subtree,
 *	  which the planner has already arranged to provide ordered data.
 *
 *	  Memory and ExprContext usage:
 *
 *	  Because we're accumulating aggregate values across input rows, we need to
 *	  use more memory contexts than just simple input/output tuple contexts.
 *	  In fact, for a rollup, we need a separate context for each grouping set
 *	  so that we can reset the inner (finer-grained) aggregates on their group
 *	  boundaries while continuing to accumulate values for outer
 *	  (coarser-grained) groupings.  On top of this, we might be simultaneously
 *	  populating hashtables; however, we only need one context for all the
 *	  hashtables.
 *
 *	  So we create an array, aggcontexts, with an ExprContext for each grouping
 *	  set in the largest rollup that we're going to process, and use the
 *	  per-tuple memory context of those ExprContexts to store the aggregate
 *	  transition values.  hashcontext is the single context created to support
 *	  all hash tables.
 *
Jeff Davis's avatar
Jeff Davis committed
197 198 199 200 201 202 203 204 205
 *	  Spilling To Disk
 *
 *	  When performing hash aggregation, if the hash table memory exceeds the
 *	  limit (see hash_agg_check_limits()), we enter "spill mode". In spill
 *	  mode, we advance the transition states only for groups already in the
 *	  hash table. For tuples that would need to create a new hash table
 *	  entries (and initialize new transition states), we instead spill them to
 *	  disk to be processed later. The tuples are spilled in a partitioned
 *	  manner, so that subsequent batches are smaller and less likely to exceed
206
 *	  hash_mem (if a batch does exceed hash_mem, it must be spilled
Jeff Davis's avatar
Jeff Davis committed
207 208 209 210 211 212 213 214
 *	  recursively).
 *
 *	  Spilled data is written to logical tapes. These provide better control
 *	  over memory usage, disk space, and the number of files than if we were
 *	  to use a BufFile for each spill.
 *
 *	  Note that it's possible for transition states to start small but then
 *	  grow very large; for instance in the case of ARRAY_AGG. In such cases,
215
 *	  it's still possible to significantly exceed hash_mem. We try to avoid
Jeff Davis's avatar
Jeff Davis committed
216 217 218 219
 *	  this situation by estimating what will fit in the available memory, and
 *	  imposing a limit on the number of groups separately from the amount of
 *	  memory consumed.
 *
220 221 222 223 224 225 226 227 228 229
 *    Transition / Combine function invocation:
 *
 *    For performance reasons transition functions, including combine
 *    functions, aren't invoked one-by-one from nodeAgg.c after computing
 *    arguments using the expression evaluation engine. Instead
 *    ExecBuildAggTrans() builds one large expression that does both argument
 *    evaluation and transition function invocation. That avoids performance
 *    issues due to repeated uses of expression evaluation, complications due
 *    to filter expressions having to be evaluated early, and allows to JIT
 *    the entire expression into one native function.
230
 *
Bruce Momjian's avatar
Bruce Momjian committed
231
 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
232
 * Portions Copyright (c) 1994, Regents of the University of California
233 234
 *
 * IDENTIFICATION
235
 *	  src/backend/executor/nodeAgg.c
236 237 238
 *
 *-------------------------------------------------------------------------
 */
239

240 241
#include "postgres.h"

242
#include "access/htup_details.h"
243
#include "access/parallel.h"
244
#include "catalog/objectaccess.h"
245
#include "catalog/pg_aggregate.h"
246
#include "catalog/pg_proc.h"
247
#include "catalog/pg_type.h"
248
#include "common/hashfn.h"
249
#include "executor/execExpr.h"
250 251
#include "executor/executor.h"
#include "executor/nodeAgg.h"
252
#include "lib/hyperloglog.h"
253
#include "miscadmin.h"
254
#include "nodes/makefuncs.h"
255
#include "nodes/nodeFuncs.h"
256
#include "optimizer/optimizer.h"
257
#include "parser/parse_agg.h"
258
#include "parser/parse_coerce.h"
259
#include "utils/acl.h"
260
#include "utils/builtins.h"
261
#include "utils/datum.h"
Jeff Davis's avatar
Jeff Davis committed
262
#include "utils/dynahash.h"
263
#include "utils/expandeddatum.h"
Jeff Davis's avatar
Jeff Davis committed
264
#include "utils/logtape.h"
265
#include "utils/lsyscache.h"
266
#include "utils/memutils.h"
Bruce Momjian's avatar
Bruce Momjian committed
267
#include "utils/syscache.h"
268
#include "utils/tuplesort.h"
269

Jeff Davis's avatar
Jeff Davis committed
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
/*
 * Control how many partitions are created when spilling HashAgg to
 * disk.
 *
 * HASHAGG_PARTITION_FACTOR is multiplied by the estimated number of
 * partitions needed such that each partition will fit in memory. The factor
 * is set higher than one because there's not a high cost to having a few too
 * many partitions, and it makes it less likely that a partition will need to
 * be spilled recursively. Another benefit of having more, smaller partitions
 * is that small hash tables may perform better than large ones due to memory
 * caching effects.
 *
 * We also specify a min and max number of partitions per spill. Too few might
 * mean a lot of wasted I/O from repeated spilling of the same tuples. Too
 * many will result in lots of memory wasted buffering the spill files (which
 * could instead be spent on a larger hash table).
 */
#define HASHAGG_PARTITION_FACTOR 1.50
#define HASHAGG_MIN_PARTITIONS 4
#define HASHAGG_MAX_PARTITIONS 1024

/*
 * For reading from tapes, the buffer size must be a multiple of
 * BLCKSZ. Larger values help when reading from multiple tapes concurrently,
 * but that doesn't happen in HashAgg, so we simply use BLCKSZ. Writing to a
 * tape always uses a buffer of size BLCKSZ.
 */
#define HASHAGG_READ_BUFFER_SIZE BLCKSZ
#define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ

300 301 302 303 304 305 306 307
/*
 * HyperLogLog is used for estimating the cardinality of the spilled tuples in
 * a given partition. 5 bits corresponds to a size of about 32 bytes and a
 * worst-case error of around 18%. That's effective enough to choose a
 * reasonable number of partitions when recursing.
 */
#define HASHAGG_HLL_BIT_WIDTH 5

308 309 310 311 312 313
/*
 * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
 * improved?
 */
#define CHUNKHDRSZ 16

Jeff Davis's avatar
Jeff Davis committed
314 315 316 317 318 319 320 321 322 323 324 325 326 327
/*
 * Track all tapes needed for a HashAgg that spills. We don't know the maximum
 * number of tapes needed at the start of the algorithm (because it can
 * recurse), so one tape set is allocated and extended as needed for new
 * tapes. When a particular tape is already read, rewind it for write mode and
 * put it in the free list.
 *
 * Tapes' buffers can take up substantial memory when many tapes are open at
 * once. We only need one tape open at a time in read mode (using a buffer
 * that's a multiple of BLCKSZ); but we need one tape open in write mode (each
 * requiring a buffer of size BLCKSZ) for each partition.
 */
typedef struct HashTapeInfo
{
328 329 330 331 332
	LogicalTapeSet *tapeset;
	int			ntapes;
	int		   *freetapes;
	int			nfreetapes;
	int			freetapes_alloc;
Jeff Davis's avatar
Jeff Davis committed
333 334 335 336 337 338 339 340 341 342 343 344 345 346
} HashTapeInfo;

/*
 * Represents partitioned spill data for a single hashtable. Contains the
 * necessary information to route tuples to the correct partition, and to
 * transform the spilled data into new batches.
 *
 * The high bits are used for partition selection (when recursing, we ignore
 * the bits that have already been used for partition selection at an earlier
 * level).
 */
typedef struct HashAggSpill
{
	LogicalTapeSet *tapeset;	/* borrowed reference to tape set */
347 348 349 350 351
	int			npartitions;	/* number of partitions */
	int		   *partitions;		/* spill partition tape numbers */
	int64	   *ntuples;		/* number of tuples in each partition */
	uint32		mask;			/* mask to find partition from hash value */
	int			shift;			/* after masking, shift by this amount */
352
	hyperLogLogState *hll_card; /* cardinality estimate for contents */
Jeff Davis's avatar
Jeff Davis committed
353 354 355 356 357 358 359 360 361 362 363 364 365
} HashAggSpill;

/*
 * Represents work to be done for one pass of hash aggregation (with only one
 * grouping set).
 *
 * Also tracks the bits of the hash already used for partition selection by
 * earlier iterations, so that this batch can use new bits. If all bits have
 * already been used, no partitioning will be done (any spilled data will go
 * to a single output tape).
 */
typedef struct HashAggBatch
{
366 367 368 369 370
	int			setno;			/* grouping set */
	int			used_bits;		/* number of bits of hash already used */
	LogicalTapeSet *tapeset;	/* borrowed reference to tape set */
	int			input_tapenum;	/* input partition tape */
	int64		input_tuples;	/* number of tuples in this batch */
371
	double		input_card;		/* estimated group cardinality */
Jeff Davis's avatar
Jeff Davis committed
372 373
} HashAggBatch;

374 375 376
/* used to find referenced colnos */
typedef struct FindColsContext
{
377 378 379
	bool		is_aggref;		/* is under an aggref */
	Bitmapset  *aggregated;		/* column references under an aggref */
	Bitmapset  *unaggregated;	/* other column references */
380 381
} FindColsContext;

382
static void select_current_set(AggState *aggstate, int setno, bool is_hash);
383 384
static void initialize_phase(AggState *aggstate, int newphase);
static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
385
static void initialize_aggregates(AggState *aggstate,
Tom Lane's avatar
Tom Lane committed
386 387
								  AggStatePerGroup *pergroups,
								  int numReset);
388
static void advance_transition_function(AggState *aggstate,
Tom Lane's avatar
Tom Lane committed
389 390
										AggStatePerTrans pertrans,
										AggStatePerGroup pergroupstate);
391
static void advance_aggregates(AggState *aggstate);
392
static void process_ordered_aggregate_single(AggState *aggstate,
Tom Lane's avatar
Tom Lane committed
393 394
											 AggStatePerTrans pertrans,
											 AggStatePerGroup pergroupstate);
395
static void process_ordered_aggregate_multi(AggState *aggstate,
Tom Lane's avatar
Tom Lane committed
396 397
											AggStatePerTrans pertrans,
											AggStatePerGroup pergroupstate);
398
static void finalize_aggregate(AggState *aggstate,
Tom Lane's avatar
Tom Lane committed
399 400 401
							   AggStatePerAgg peragg,
							   AggStatePerGroup pergroupstate,
							   Datum *resultVal, bool *resultIsNull);
402
static void finalize_partialaggregate(AggState *aggstate,
Tom Lane's avatar
Tom Lane committed
403 404 405
									  AggStatePerAgg peragg,
									  AggStatePerGroup pergroupstate,
									  Datum *resultVal, bool *resultIsNull);
406 407 408
static inline void prepare_hash_slot(AggStatePerHash perhash,
									 TupleTableSlot *inputslot,
									 TupleTableSlot *hashslot);
409
static void prepare_projection_slot(AggState *aggstate,
Tom Lane's avatar
Tom Lane committed
410 411
									TupleTableSlot *slot,
									int currentSet);
412
static void finalize_aggregates(AggState *aggstate,
Tom Lane's avatar
Tom Lane committed
413 414
								AggStatePerAgg peragg,
								AggStatePerGroup pergroup);
415
static TupleTableSlot *project_aggregates(AggState *aggstate);
416 417 418
static void find_cols(AggState *aggstate, Bitmapset **aggregated,
					  Bitmapset **unaggregated);
static bool find_cols_walker(Node *node, FindColsContext *context);
Jeff Davis's avatar
Jeff Davis committed
419 420
static void build_hash_tables(AggState *aggstate);
static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
Jeff Davis's avatar
Jeff Davis committed
421 422 423 424 425
static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
										  bool nullcheck);
static long hash_choose_num_buckets(double hashentrysize,
									long estimated_nbuckets,
									Size memory);
426
static int	hash_choose_num_partitions(double input_groups,
427 428 429
									   double hashentrysize,
									   int used_bits,
									   int *log2_npartittions);
430 431 432
static void initialize_hash_entry(AggState *aggstate,
								  TupleHashTable hashtable,
								  TupleHashEntry entry);
433
static void lookup_hash_entries(AggState *aggstate);
434 435
static TupleTableSlot *agg_retrieve_direct(AggState *aggstate);
static void agg_fill_hash_table(AggState *aggstate);
Jeff Davis's avatar
Jeff Davis committed
436
static bool agg_refill_hash_table(AggState *aggstate);
437
static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate);
Jeff Davis's avatar
Jeff Davis committed
438 439 440 441 442 443 444 445 446
static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate);
static void hash_agg_check_limits(AggState *aggstate);
static void hash_agg_enter_spill_mode(AggState *aggstate);
static void hash_agg_update_metrics(AggState *aggstate, bool from_tape,
									int npartitions);
static void hashagg_finish_initial_spills(AggState *aggstate);
static void hashagg_reset_spill_state(AggState *aggstate);
static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
									   int input_tapenum, int setno,
447 448
									   int64 input_tuples, double input_card,
									   int used_bits);
Jeff Davis's avatar
Jeff Davis committed
449 450
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
451
							   int used_bits, double input_groups,
Jeff Davis's avatar
Jeff Davis committed
452
							   double hashentrysize);
453 454
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
								TupleTableSlot *slot, uint32 hash);
Jeff Davis's avatar
Jeff Davis committed
455 456 457 458 459 460
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
								 int setno);
static void hashagg_tapeinfo_init(AggState *aggstate);
static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest,
									int ndest);
static void hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum);
461
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
462
static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
Tom Lane's avatar
Tom Lane committed
463 464 465 466 467
									  AggState *aggstate, EState *estate,
									  Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
									  Oid aggserialfn, Oid aggdeserialfn,
									  Datum initValue, bool initValueIsNull,
									  Oid *inputTypes, int numArguments);
Bruce Momjian's avatar
Bruce Momjian committed
468

469

470
/*
471 472 473 474 475 476
 * Select the current grouping set; affects current_set and
 * curaggcontext.
 */
static void
select_current_set(AggState *aggstate, int setno, bool is_hash)
{
477 478 479 480
	/*
	 * When changing this, also adapt ExecAggPlainTransByVal() and
	 * ExecAggPlainTransByRef().
	 */
481 482 483 484 485 486 487 488 489 490
	if (is_hash)
		aggstate->curaggcontext = aggstate->hashcontext;
	else
		aggstate->curaggcontext = aggstate->aggcontexts[setno];

	aggstate->current_set = setno;
}

/*
 * Switch to phase "newphase", which must either be 0 or 1 (to reset) or
491
 * current_phase + 1. Juggle the tuplesorts accordingly.
492 493 494
 *
 * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED
 * case, so when entering phase 0, all we need to do is drop open sorts.
495
 */
496
static void
497
initialize_phase(AggState *aggstate, int newphase)
498
{
499
	Assert(newphase <= 1 || newphase == aggstate->current_phase + 1);
500

501 502 503 504 505
	/*
	 * Whatever the previous state, we're now done with whatever input
	 * tuplesort was in use.
	 */
	if (aggstate->sort_in)
506
	{
507 508 509
		tuplesort_end(aggstate->sort_in);
		aggstate->sort_in = NULL;
	}
510

511
	if (newphase <= 1)
512
	{
513
		/*
514
		 * Discard any existing output tuplesort.
515
		 */
516
		if (aggstate->sort_out)
517
		{
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
			tuplesort_end(aggstate->sort_out);
			aggstate->sort_out = NULL;
		}
	}
	else
	{
		/*
		 * The old output tuplesort becomes the new input one, and this is the
		 * right time to actually sort it.
		 */
		aggstate->sort_in = aggstate->sort_out;
		aggstate->sort_out = NULL;
		Assert(aggstate->sort_in);
		tuplesort_performsort(aggstate->sort_in);
	}
533

534
	/*
Bruce Momjian's avatar
Bruce Momjian committed
535 536
	 * If this isn't the last phase, we need to sort appropriately for the
	 * next phase in sequence.
537
	 */
538
	if (newphase > 0 && newphase < aggstate->numphases - 1)
539
	{
Bruce Momjian's avatar
Bruce Momjian committed
540
		Sort	   *sortnode = aggstate->phases[newphase + 1].sortnode;
541 542 543 544 545 546 547 548 549 550
		PlanState  *outerNode = outerPlanState(aggstate);
		TupleDesc	tupDesc = ExecGetResultType(outerNode);

		aggstate->sort_out = tuplesort_begin_heap(tupDesc,
												  sortnode->numCols,
												  sortnode->sortColIdx,
												  sortnode->sortOperators,
												  sortnode->collations,
												  sortnode->nullsFirst,
												  work_mem,
551
												  NULL, false);
552 553 554 555 556 557 558
	}

	aggstate->current_phase = newphase;
	aggstate->phase = &aggstate->phases[newphase];
}

/*
559
 * Fetch a tuple from either the outer plan (for phase 1) or from the sorter
560 561
 * populated by the previous phase.  Copy it to the sorter for the next phase
 * if any.
562 563 564
 *
 * Callers cannot rely on memory for tuple in returned slot remaining valid
 * past any subsequently fetched tuple.
565 566 567 568 569 570 571 572
 */
static TupleTableSlot *
fetch_input_tuple(AggState *aggstate)
{
	TupleTableSlot *slot;

	if (aggstate->sort_in)
	{
573 574
		/* make sure we check for interrupts in either path through here */
		CHECK_FOR_INTERRUPTS();
575 576
		if (!tuplesort_gettupleslot(aggstate->sort_in, true, false,
									aggstate->sort_slot, NULL))
577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
			return NULL;
		slot = aggstate->sort_slot;
	}
	else
		slot = ExecProcNode(outerPlanState(aggstate));

	if (!TupIsNull(slot) && aggstate->sort_out)
		tuplesort_puttupleslot(aggstate->sort_out, slot);

	return slot;
}

/*
 * (Re)Initialize an individual aggregate.
 *
592 593
 * This function handles only one grouping set, already set in
 * aggstate->current_set.
594 595 596 597
 *
 * When called, CurrentMemoryContext should be the per-query context.
 */
static void
598
initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
599 600 601 602 603
					 AggStatePerGroup pergroupstate)
{
	/*
	 * Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
	 */
604
	if (pertrans->numSortCols > 0)
605 606 607 608 609
	{
		/*
		 * In case of rescan, maybe there could be an uncompleted sort
		 * operation?  Clean it up if so.
		 */
610 611
		if (pertrans->sortstates[aggstate->current_set])
			tuplesort_end(pertrans->sortstates[aggstate->current_set]);
612 613 614 615 616 617 618


		/*
		 * We use a plain Datum sorter when there's a single input column;
		 * otherwise sort the full tuple.  (See comments for
		 * process_ordered_aggregate_single.)
		 */
619
		if (pertrans->numInputs == 1)
620 621 622
		{
			Form_pg_attribute attr = TupleDescAttr(pertrans->sortdesc, 0);

623
			pertrans->sortstates[aggstate->current_set] =
624
				tuplesort_begin_datum(attr->atttypid,
625 626 627
									  pertrans->sortOperators[0],
									  pertrans->sortCollations[0],
									  pertrans->sortNullsFirst[0],
628
									  work_mem, NULL, false);
629
		}
630
		else
631
			pertrans->sortstates[aggstate->current_set] =
632
				tuplesort_begin_heap(pertrans->sortdesc,
633 634 635 636 637
									 pertrans->numSortCols,
									 pertrans->sortColIdx,
									 pertrans->sortOperators,
									 pertrans->sortCollations,
									 pertrans->sortNullsFirst,
638
									 work_mem, NULL, false);
639
	}
640

641 642 643
	/*
	 * (Re)set transValue to the initial value.
	 *
Bruce Momjian's avatar
Bruce Momjian committed
644 645
	 * Note that when the initial value is pass-by-ref, we must copy it (into
	 * the aggcontext) since we will pfree the transValue later.
646
	 */
647 648
	if (pertrans->initValueIsNull)
		pergroupstate->transValue = pertrans->initValue;
649 650 651 652
	else
	{
		MemoryContext oldContext;

653
		oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
654 655 656
		pergroupstate->transValue = datumCopy(pertrans->initValue,
											  pertrans->transtypeByVal,
											  pertrans->transtypeLen);
657 658
		MemoryContextSwitchTo(oldContext);
	}
659
	pergroupstate->transValueIsNull = pertrans->initValueIsNull;
660 661

	/*
Bruce Momjian's avatar
Bruce Momjian committed
662 663 664 665 666
	 * If the initial value for the transition state doesn't exist in the
	 * pg_aggregate table then we will let the first non-NULL value returned
	 * from the outer procNode become the initial value. (This is useful for
	 * aggregates like max() and min().) The noTransValue flag signals that we
	 * still need to do this.
667
	 */
668
	pergroupstate->noTransValue = pertrans->initValueIsNull;
669 670 671
}

/*
672
 * Initialize all aggregate transition states for a new group of input values.
673 674 675
 *
 * If there are multiple grouping sets, we initialize only the first numReset
 * of them (the grouping sets are ordered so that the most specific one, which
676
 * is reset most often, is first). As a convenience, if numReset is 0, we
677 678 679 680
 * reinitialize all sets.
 *
 * NB: This cannot be used for hash aggregates, as for those the grouping set
 * number has to be specified from further up.
681 682 683 684 685
 *
 * When called, CurrentMemoryContext should be the per-query context.
 */
static void
initialize_aggregates(AggState *aggstate,
686
					  AggStatePerGroup *pergroups,
687 688
					  int numReset)
{
689
	int			transno;
Bruce Momjian's avatar
Bruce Momjian committed
690 691
	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
	int			setno = 0;
692
	int			numTrans = aggstate->numtrans;
693
	AggStatePerTrans transstates = aggstate->pertrans;
694

695
	if (numReset == 0)
696 697
		numReset = numGroupingSets;

698
	for (setno = 0; setno < numReset; setno++)
699
	{
700
		AggStatePerGroup pergroup = pergroups[setno];
701

702
		select_current_set(aggstate, setno, false);
703

704
		for (transno = 0; transno < numTrans; transno++)
705
		{
706 707
			AggStatePerTrans pertrans = &transstates[transno];
			AggStatePerGroup pergroupstate = &pergroup[transno];
708

709
			initialize_aggregate(aggstate, pertrans, pergroupstate);
710
		}
711
	}
712 713 714
}

/*
715
 * Given new input value(s), advance the transition function of one aggregate
716
 * state within one grouping set only (already set in aggstate->current_set)
717 718
 *
 * The new values (and null flags) have been preloaded into argument positions
719 720 721
 * 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
 * pass to the transition function.  We also expect that the static fields of
 * the fcinfo are already initialized; that was done by ExecInitAgg().
722
 *
723
 * It doesn't matter which memory context this is called in.
724 725
 */
static void
726
advance_transition_function(AggState *aggstate,
727
							AggStatePerTrans pertrans,
728
							AggStatePerGroup pergroupstate)
729
{
730
	FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
731
	MemoryContext oldContext;
Bruce Momjian's avatar
Bruce Momjian committed
732
	Datum		newVal;
733

734
	if (pertrans->transfn.fn_strict)
735
	{
736
		/*
Bruce Momjian's avatar
Bruce Momjian committed
737 738
		 * For a strict transfn, nothing happens when there's a NULL input; we
		 * just keep the prior transValue.
739
		 */
740
		int			numTransInputs = pertrans->numTransInputs;
741 742
		int			i;

743
		for (i = 1; i <= numTransInputs; i++)
744
		{
745
			if (fcinfo->args[i].isnull)
746 747
				return;
		}
748
		if (pergroupstate->noTransValue)
749 750
		{
			/*
751 752 753 754
			 * transValue has not been initialized. This is the first non-NULL
			 * input value. We use it as the initial value for transValue. (We
			 * already checked that the agg's input type is binary-compatible
			 * with its transtype, so straight copy here is OK.)
755
			 *
756 757
			 * We must copy the datum into aggcontext if it is pass-by-ref. We
			 * do not need to pfree the old transValue, since it's NULL.
758
			 */
759
			oldContext = MemoryContextSwitchTo(aggstate->curaggcontext->ecxt_per_tuple_memory);
760
			pergroupstate->transValue = datumCopy(fcinfo->args[1].value,
761 762
												  pertrans->transtypeByVal,
												  pertrans->transtypeLen);
763 764 765
			pergroupstate->transValueIsNull = false;
			pergroupstate->noTransValue = false;
			MemoryContextSwitchTo(oldContext);
766
			return;
767
		}
768
		if (pergroupstate->transValueIsNull)
769
		{
770
			/*
771
			 * Don't call a strict function with NULL inputs.  Note it is
772 773 774
			 * possible to get here despite the above tests, if the transfn is
			 * strict *and* returned a NULL on a prior cycle. If that happens
			 * we will propagate the NULL all the way to the end.
775
			 */
776
			return;
777 778
		}
	}
779

780 781 782
	/* We run the transition functions in per-input-tuple memory context */
	oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);

783 784
	/* set up aggstate->curpertrans for AggGetAggref() */
	aggstate->curpertrans = pertrans;
785

786 787 788
	/*
	 * OK to call the transition function
	 */
789 790
	fcinfo->args[0].value = pergroupstate->transValue;
	fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
791
	fcinfo->isnull = false;		/* just in case transfn doesn't set it */
Bruce Momjian's avatar
Bruce Momjian committed
792

793
	newVal = FunctionCallInvoke(fcinfo);
794

795
	aggstate->curpertrans = NULL;
796

797
	/*
798
	 * If pass-by-ref datatype, must copy the new value into aggcontext and
799 800 801 802
	 * free the prior transValue.  But if transfn returned a pointer to its
	 * first input, we don't need to do anything.  Also, if transfn returned a
	 * pointer to a R/W expanded object that is already a child of the
	 * aggcontext, assume we can adopt that value without copying it.
803
	 *
804 805 806 807 808 809 810 811
	 * It's safe to compare newVal with pergroup->transValue without regard
	 * for either being NULL, because ExecAggTransReparent() takes care to set
	 * transValue to 0 when NULL. Otherwise we could end up accidentally not
	 * reparenting, when the transValue has the same numerical value as
	 * newValue, despite being NULL.  This is a somewhat hot path, making it
	 * undesirable to instead solve this with another branch for the common
	 * case of the transition function returning its (modified) input
	 * argument.
812
	 */
813
	if (!pertrans->transtypeByVal &&
814
		DatumGetPointer(newVal) != DatumGetPointer(pergroupstate->transValue))
815 816 817 818
		newVal = ExecAggTransReparent(aggstate, pertrans,
									  newVal, fcinfo->isnull,
									  pergroupstate->transValue,
									  pergroupstate->transValueIsNull);
819 820

	pergroupstate->transValue = newVal;
821
	pergroupstate->transValueIsNull = fcinfo->isnull;
822

823
	MemoryContextSwitchTo(oldContext);
824
}
825

826
/*
827 828
 * Advance each aggregate transition state for one input tuple.  The input
 * tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
829 830 831 832 833
 * accessible to ExecEvalExpr.
 *
 * We have two sets of transition states to handle: one for sorted aggregation
 * and one for hashed; we do them both here, to avoid multiple evaluation of
 * the inputs.
834 835 836 837
 *
 * When called, CurrentMemoryContext should be the per-query context.
 */
static void
838
advance_aggregates(AggState *aggstate)
839
{
840
	bool		dummynull;
841

842 843 844
	ExecEvalExprSwitchContext(aggstate->phase->evaltrans,
							  aggstate->tmpcontext,
							  &dummynull);
845 846
}

847
/*
848 849
 * Run the transition function for a DISTINCT or ORDER BY aggregate
 * with only one input.  This is called after we have completed
Bruce Momjian's avatar
Bruce Momjian committed
850
 * entering all the input values into the sort object.  We complete the
851 852 853 854 855 856 857 858 859 860 861 862
 * sort, read out the values in sorted order, and run the transition
 * function on each value (applying DISTINCT if appropriate).
 *
 * Note that the strictness of the transition function was checked when
 * entering the values into the sort, so we don't check it again here;
 * we just apply standard SQL DISTINCT logic.
 *
 * The one-input case is handled separately from the multi-input case
 * for performance reasons: for single by-value inputs, such as the
 * common case of count(distinct id), the tuplesort_getdatum code path
 * is around 300% faster.  (The speedup for by-reference types is less
 * but still noticeable.)
863
 *
864 865 866
 * This function handles only one grouping set (already set in
 * aggstate->current_set).
 *
867
 * When called, CurrentMemoryContext should be the per-query context.
868 869
 */
static void
870
process_ordered_aggregate_single(AggState *aggstate,
871
								 AggStatePerTrans pertrans,
872
								 AggStatePerGroup pergroupstate)
873
{
874
	Datum		oldVal = (Datum) 0;
Bruce Momjian's avatar
Bruce Momjian committed
875
	bool		oldIsNull = true;
876
	bool		haveOldVal = false;
877
	MemoryContext workcontext = aggstate->tmpcontext->ecxt_per_tuple_memory;
878
	MemoryContext oldContext;
879
	bool		isDistinct = (pertrans->numDistinctCols > 0);
880 881
	Datum		newAbbrevVal = (Datum) 0;
	Datum		oldAbbrevVal = (Datum) 0;
882
	FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
Bruce Momjian's avatar
Bruce Momjian committed
883 884
	Datum	   *newVal;
	bool	   *isNull;
885

886
	Assert(pertrans->numDistinctCols < 2);
887

888
	tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
889

890
	/* Load the column into argument 1 (arg 0 will be transition value) */
891 892
	newVal = &fcinfo->args[1].value;
	isNull = &fcinfo->args[1].isnull;
893

894
	/*
895 896 897
	 * Note: if input type is pass-by-ref, the datums returned by the sort are
	 * freshly palloc'd in the per-query context, so we must be careful to
	 * pfree them when they are no longer needed.
898
	 */
899

900
	while (tuplesort_getdatum(pertrans->sortstates[aggstate->current_set],
901
							  true, newVal, isNull, &newAbbrevVal))
902
	{
903
		/*
904 905
		 * Clear and select the working context for evaluation of the equality
		 * function and transition function.
906
		 */
907 908
		MemoryContextReset(workcontext);
		oldContext = MemoryContextSwitchTo(workcontext);
909

910 911 912 913 914 915 916
		/*
		 * If DISTINCT mode, and not distinct from prior, skip it.
		 */
		if (isDistinct &&
			haveOldVal &&
			((oldIsNull && *isNull) ||
			 (!oldIsNull && !*isNull &&
917
			  oldAbbrevVal == newAbbrevVal &&
918 919
			  DatumGetBool(FunctionCall2Coll(&pertrans->equalfnOne,
											 pertrans->aggCollation,
Tom Lane's avatar
Tom Lane committed
920
											 oldVal, *newVal)))))
921 922
		{
			/* equal to prior, so forget this one */
923
			if (!pertrans->inputtypeByVal && !*isNull)
924
				pfree(DatumGetPointer(*newVal));
925 926
		}
		else
927
		{
928
			advance_transition_function(aggstate, pertrans, pergroupstate);
929
			/* forget the old value, if any */
930
			if (!oldIsNull && !pertrans->inputtypeByVal)
931
				pfree(DatumGetPointer(oldVal));
932
			/* and remember the new one for subsequent equality checks */
933
			oldVal = *newVal;
934
			oldAbbrevVal = newAbbrevVal;
935
			oldIsNull = *isNull;
936 937
			haveOldVal = true;
		}
938 939

		MemoryContextSwitchTo(oldContext);
940 941
	}

942
	if (!oldIsNull && !pertrans->inputtypeByVal)
943 944
		pfree(DatumGetPointer(oldVal));

945 946
	tuplesort_end(pertrans->sortstates[aggstate->current_set]);
	pertrans->sortstates[aggstate->current_set] = NULL;
947 948
}

949 950 951
/*
 * Run the transition function for a DISTINCT or ORDER BY aggregate
 * with more than one input.  This is called after we have completed
Bruce Momjian's avatar
Bruce Momjian committed
952
 * entering all the input values into the sort object.  We complete the
953 954 955
 * sort, read out the values in sorted order, and run the transition
 * function on each value (applying DISTINCT if appropriate).
 *
956 957 958
 * This function handles only one grouping set (already set in
 * aggstate->current_set).
 *
959 960 961 962
 * When called, CurrentMemoryContext should be the per-query context.
 */
static void
process_ordered_aggregate_multi(AggState *aggstate,
963
								AggStatePerTrans pertrans,
964 965
								AggStatePerGroup pergroupstate)
{
966
	ExprContext *tmpcontext = aggstate->tmpcontext;
967
	FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
968
	TupleTableSlot *slot1 = pertrans->sortslot;
969 970 971
	TupleTableSlot *slot2 = pertrans->uniqslot;
	int			numTransInputs = pertrans->numTransInputs;
	int			numDistinctCols = pertrans->numDistinctCols;
972 973
	Datum		newAbbrevVal = (Datum) 0;
	Datum		oldAbbrevVal = (Datum) 0;
Bruce Momjian's avatar
Bruce Momjian committed
974
	bool		haveOldValue = false;
975
	TupleTableSlot *save = aggstate->tmpcontext->ecxt_outertuple;
Bruce Momjian's avatar
Bruce Momjian committed
976
	int			i;
977

978
	tuplesort_performsort(pertrans->sortstates[aggstate->current_set]);
979 980 981 982 983

	ExecClearTuple(slot1);
	if (slot2)
		ExecClearTuple(slot2);

984
	while (tuplesort_gettupleslot(pertrans->sortstates[aggstate->current_set],
985
								  true, true, slot1, &newAbbrevVal))
986
	{
987 988
		CHECK_FOR_INTERRUPTS();

989 990
		tmpcontext->ecxt_outertuple = slot1;
		tmpcontext->ecxt_innertuple = slot2;
991 992 993

		if (numDistinctCols == 0 ||
			!haveOldValue ||
994
			newAbbrevVal != oldAbbrevVal ||
995
			!ExecQual(pertrans->equalfnMulti, tmpcontext))
996
		{
997 998 999 1000 1001 1002
			/*
			 * Extract the first numTransInputs columns as datums to pass to
			 * the transfn.
			 */
			slot_getsomeattrs(slot1, numTransInputs);

1003 1004
			/* Load values into fcinfo */
			/* Start from 1, since the 0th arg will be the transition value */
1005
			for (i = 0; i < numTransInputs; i++)
1006
			{
1007 1008
				fcinfo->args[i + 1].value = slot1->tts_values[i];
				fcinfo->args[i + 1].isnull = slot1->tts_isnull[i];
1009 1010
			}

1011
			advance_transition_function(aggstate, pertrans, pergroupstate);
1012 1013 1014 1015 1016 1017 1018 1019

			if (numDistinctCols > 0)
			{
				/* swap the slot pointers to retain the current tuple */
				TupleTableSlot *tmpslot = slot2;

				slot2 = slot1;
				slot1 = tmpslot;
1020
				/* avoid ExecQual() calls by reusing abbreviated keys */
1021
				oldAbbrevVal = newAbbrevVal;
1022 1023 1024 1025
				haveOldValue = true;
			}
		}

1026 1027
		/* Reset context each time */
		ResetExprContext(tmpcontext);
1028 1029 1030 1031 1032 1033 1034

		ExecClearTuple(slot1);
	}

	if (slot2)
		ExecClearTuple(slot2);

1035 1036
	tuplesort_end(pertrans->sortstates[aggstate->current_set]);
	pertrans->sortstates[aggstate->current_set] = NULL;
1037 1038 1039

	/* restore previous slot, potentially in use for grouping sets */
	tmpcontext->ecxt_outertuple = save;
1040 1041
}

1042 1043 1044
/*
 * Compute the final value of one aggregate.
 *
1045 1046 1047
 * This function handles only one grouping set (already set in
 * aggstate->current_set).
 *
1048
 * The finalfn will be run, and the result delivered, in the
1049
 * output-tuple context; caller's CurrentMemoryContext does not matter.
1050 1051 1052 1053
 *
 * The finalfn uses the state as set in the transno. This also might be
 * being used by another aggregate function, so it's important that we do
 * nothing destructive here.
1054 1055
 */
static void
1056
finalize_aggregate(AggState *aggstate,
1057
				   AggStatePerAgg peragg,
1058
				   AggStatePerGroup pergroupstate,
1059 1060
				   Datum *resultVal, bool *resultIsNull)
{
1061
	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
1062
	bool		anynull = false;
1063
	MemoryContext oldContext;
1064 1065
	int			i;
	ListCell   *lc;
1066
	AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
1067

1068
	oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);
1069

1070 1071 1072
	/*
	 * Evaluate any direct arguments.  We do this even if there's no finalfn
	 * (which is unlikely anyway), so that side-effects happen as expected.
1073 1074
	 * The direct arguments go into arg positions 1 and up, leaving position 0
	 * for the transition state value.
1075 1076
	 */
	i = 1;
1077
	foreach(lc, peragg->aggdirectargs)
1078 1079 1080
	{
		ExprState  *expr = (ExprState *) lfirst(lc);

1081 1082 1083 1084
		fcinfo->args[i].value = ExecEvalExpr(expr,
											 aggstate->ss.ps.ps_ExprContext,
											 &fcinfo->args[i].isnull);
		anynull |= fcinfo->args[i].isnull;
1085 1086 1087
		i++;
	}

1088
	/*
1089
	 * Apply the agg's finalfn if one is provided, else return transValue.
1090
	 */
1091
	if (OidIsValid(peragg->finalfn_oid))
1092
	{
1093
		int			numFinalArgs = peragg->numFinalArgs;
1094

1095 1096
		/* set up aggstate->curperagg for AggGetAggref() */
		aggstate->curperagg = peragg;
1097

1098
		InitFunctionCallInfoData(*fcinfo, &peragg->finalfn,
1099
								 numFinalArgs,
1100
								 pertrans->aggCollation,
1101
								 (void *) aggstate, NULL);
1102 1103

		/* Fill in the transition state value */
1104 1105 1106 1107 1108
		fcinfo->args[0].value =
			MakeExpandedObjectReadOnly(pergroupstate->transValue,
									   pergroupstate->transValueIsNull,
									   pertrans->transtypeLen);
		fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1109 1110 1111
		anynull |= pergroupstate->transValueIsNull;

		/* Fill any remaining argument positions with nulls */
1112
		for (; i < numFinalArgs; i++)
1113
		{
1114 1115
			fcinfo->args[i].value = (Datum) 0;
			fcinfo->args[i].isnull = true;
1116 1117 1118
			anynull = true;
		}

1119
		if (fcinfo->flinfo->fn_strict && anynull)
1120 1121 1122 1123 1124 1125 1126
		{
			/* don't call a strict function with NULL inputs */
			*resultVal = (Datum) 0;
			*resultIsNull = true;
		}
		else
		{
1127 1128
			*resultVal = FunctionCallInvoke(fcinfo);
			*resultIsNull = fcinfo->isnull;
1129
		}
1130
		aggstate->curperagg = NULL;
1131
	}
1132
	else
1133
	{
1134
		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1135 1136
		*resultVal = pergroupstate->transValue;
		*resultIsNull = pergroupstate->transValueIsNull;
1137
	}
1138

1139
	/*
1140
	 * If result is pass-by-ref, make sure it is in the right context.
1141
	 */
1142
	if (!peragg->resulttypeByVal && !*resultIsNull &&
1143 1144
		!MemoryContextContains(CurrentMemoryContext,
							   DatumGetPointer(*resultVal)))
1145
		*resultVal = datumCopy(*resultVal,
1146 1147
							   peragg->resulttypeByVal,
							   peragg->resulttypeLen);
1148 1149

	MemoryContextSwitchTo(oldContext);
1150
}
1151

1152
/*
1153
 * Compute the output value of one partial aggregate.
1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
 *
 * The serialization function will be run, and the result delivered, in the
 * output-tuple context; caller's CurrentMemoryContext does not matter.
 */
static void
finalize_partialaggregate(AggState *aggstate,
						  AggStatePerAgg peragg,
						  AggStatePerGroup pergroupstate,
						  Datum *resultVal, bool *resultIsNull)
{
Robert Haas's avatar
Robert Haas committed
1164 1165
	AggStatePerTrans pertrans = &aggstate->pertrans[peragg->transno];
	MemoryContext oldContext;
1166 1167 1168 1169

	oldContext = MemoryContextSwitchTo(aggstate->ss.ps.ps_ExprContext->ecxt_per_tuple_memory);

	/*
1170 1171
	 * serialfn_oid will be set if we must serialize the transvalue before
	 * returning it
1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
	 */
	if (OidIsValid(pertrans->serialfn_oid))
	{
		/* Don't call a strict serialization function with NULL input. */
		if (pertrans->serialfn.fn_strict && pergroupstate->transValueIsNull)
		{
			*resultVal = (Datum) 0;
			*resultIsNull = true;
		}
		else
		{
1183
			FunctionCallInfo fcinfo = pertrans->serialfn_fcinfo;
Robert Haas's avatar
Robert Haas committed
1184

1185 1186 1187 1188 1189
			fcinfo->args[0].value =
				MakeExpandedObjectReadOnly(pergroupstate->transValue,
										   pergroupstate->transValueIsNull,
										   pertrans->transtypeLen);
			fcinfo->args[0].isnull = pergroupstate->transValueIsNull;
1190
			fcinfo->isnull = false;
1191 1192 1193 1194 1195 1196 1197

			*resultVal = FunctionCallInvoke(fcinfo);
			*resultIsNull = fcinfo->isnull;
		}
	}
	else
	{
1198
		/* Don't need MakeExpandedObjectReadOnly; datumCopy will copy it */
1199 1200 1201 1202 1203 1204 1205
		*resultVal = pergroupstate->transValue;
		*resultIsNull = pergroupstate->transValueIsNull;
	}

	/* If result is pass-by-ref, make sure it is in the right context. */
	if (!peragg->resulttypeByVal && !*resultIsNull &&
		!MemoryContextContains(CurrentMemoryContext,
Robert Haas's avatar
Robert Haas committed
1206
							   DatumGetPointer(*resultVal)))
1207 1208 1209 1210 1211 1212
		*resultVal = datumCopy(*resultVal,
							   peragg->resulttypeByVal,
							   peragg->resulttypeLen);

	MemoryContextSwitchTo(oldContext);
}
1213

Jeff Davis's avatar
Jeff Davis committed
1214 1215 1216 1217
/*
 * Extract the attributes that make up the grouping key into the
 * hashslot. This is necessary to compute the hash or perform a lookup.
 */
1218 1219 1220 1221
static inline void
prepare_hash_slot(AggStatePerHash perhash,
				  TupleTableSlot *inputslot,
				  TupleTableSlot *hashslot)
Jeff Davis's avatar
Jeff Davis committed
1222
{
1223
	int			i;
Jeff Davis's avatar
Jeff Davis committed
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238

	/* transfer just the needed columns into hashslot */
	slot_getsomeattrs(inputslot, perhash->largestGrpColIdx);
	ExecClearTuple(hashslot);

	for (i = 0; i < perhash->numhashGrpCols; i++)
	{
		int			varNumber = perhash->hashGrpColIdxInput[i] - 1;

		hashslot->tts_values[i] = inputslot->tts_values[varNumber];
		hashslot->tts_isnull[i] = inputslot->tts_isnull[varNumber];
	}
	ExecStoreVirtualTuple(hashslot);
}

1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267
/*
 * Prepare to finalize and project based on the specified representative tuple
 * slot and grouping set.
 *
 * In the specified tuple slot, force to null all attributes that should be
 * read as null in the context of the current grouping set.  Also stash the
 * current group bitmap where GroupingExpr can get at it.
 *
 * This relies on three conditions:
 *
 * 1) Nothing is ever going to try and extract the whole tuple from this slot,
 * only reference it in evaluations, which will only access individual
 * attributes.
 *
 * 2) No system columns are going to need to be nulled. (If a system column is
 * referenced in a group clause, it is actually projected in the outer plan
 * tlist.)
 *
 * 3) Within a given phase, we never need to recover the value of an attribute
 * once it has been set to null.
 *
 * Poking into the slot this way is a bit ugly, but the consensus is that the
 * alternative was worse.
 */
static void
prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet)
{
	if (aggstate->phase->grouped_cols)
	{
Bruce Momjian's avatar
Bruce Momjian committed
1268
		Bitmapset  *grouped_cols = aggstate->phase->grouped_cols[currentSet];
1269 1270 1271

		aggstate->grouped_cols = grouped_cols;

1272
		if (TTS_EMPTY(slot))
1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289
		{
			/*
			 * Force all values to be NULL if working on an empty input tuple
			 * (i.e. an empty grouping set for which no input rows were
			 * supplied).
			 */
			ExecStoreAllNullTuple(slot);
		}
		else if (aggstate->all_grouped_cols)
		{
			ListCell   *lc;

			/* all_grouped_cols is arranged in desc order */
			slot_getsomeattrs(slot, linitial_int(aggstate->all_grouped_cols));

			foreach(lc, aggstate->all_grouped_cols)
			{
Bruce Momjian's avatar
Bruce Momjian committed
1290
				int			attnum = lfirst_int(lc);
1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301

				if (!bms_is_member(attnum, grouped_cols))
					slot->tts_isnull[attnum - 1] = true;
			}
		}
	}
}

/*
 * Compute the final value of all aggregates for one group.
 *
1302 1303 1304
 * This function handles only one grouping set at a time, which the caller must
 * have selected.  It's also the caller's responsibility to adjust the supplied
 * pergroup parameter to point to the current set's transvalues.
1305 1306 1307 1308 1309
 *
 * Results are stored in the output econtext aggvalues/aggnulls.
 */
static void
finalize_aggregates(AggState *aggstate,
1310
					AggStatePerAgg peraggs,
1311
					AggStatePerGroup pergroup)
1312 1313 1314 1315 1316
{
	ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
	Datum	   *aggvalues = econtext->ecxt_aggvalues;
	bool	   *aggnulls = econtext->ecxt_aggnulls;
	int			aggno;
1317
	int			transno;
1318

1319 1320 1321 1322 1323
	/*
	 * If there were any DISTINCT and/or ORDER BY aggregates, sort their
	 * inputs and run the transition functions.
	 */
	for (transno = 0; transno < aggstate->numtrans; transno++)
1324
	{
1325
		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
1326 1327
		AggStatePerGroup pergroupstate;

1328
		pergroupstate = &pergroup[transno];
1329

1330
		if (pertrans->numSortCols > 0)
1331
		{
1332 1333
			Assert(aggstate->aggstrategy != AGG_HASHED &&
				   aggstate->aggstrategy != AGG_MIXED);
1334

1335
			if (pertrans->numInputs == 1)
1336
				process_ordered_aggregate_single(aggstate,
1337
												 pertrans,
1338 1339 1340
												 pergroupstate);
			else
				process_ordered_aggregate_multi(aggstate,
1341
												pertrans,
1342 1343
												pergroupstate);
		}
1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354
	}

	/*
	 * Run the final functions.
	 */
	for (aggno = 0; aggno < aggstate->numaggs; aggno++)
	{
		AggStatePerAgg peragg = &peraggs[aggno];
		int			transno = peragg->transno;
		AggStatePerGroup pergroupstate;

1355
		pergroupstate = &pergroup[transno];
1356

1357
		if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
1358 1359
			finalize_partialaggregate(aggstate, peragg, pergroupstate,
									  &aggvalues[aggno], &aggnulls[aggno]);
1360 1361 1362
		else
			finalize_aggregate(aggstate, peragg, pergroupstate,
							   &aggvalues[aggno], &aggnulls[aggno]);
1363 1364 1365 1366 1367 1368
	}
}

/*
 * Project the result of a group (whose aggs have already been calculated by
 * finalize_aggregates). Returns the result slot, or NULL if no row is
1369
 * projected (suppressed by qual).
1370 1371 1372 1373 1374 1375 1376
 */
static TupleTableSlot *
project_aggregates(AggState *aggstate)
{
	ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;

	/*
Bruce Momjian's avatar
Bruce Momjian committed
1377
	 * Check the qual (HAVING clause); if the group does not match, ignore it.
1378
	 */
1379
	if (ExecQual(aggstate->ss.ps.qual, econtext))
1380 1381
	{
		/*
1382 1383
		 * Form and return projection tuple using the aggregate results and
		 * the representative input tuple.
1384
		 */
1385
		return ExecProject(aggstate->ss.ps.ps_ProjInfo);
1386 1387 1388 1389 1390 1391 1392
	}
	else
		InstrCountFiltered1(aggstate, 1);

	return NULL;
}

1393
/*
1394
 * Find input-tuple columns that are needed, dividing them into
1395
 * aggregated and unaggregated sets.
1396
 */
1397 1398
static void
find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated)
1399
{
1400
	Agg		   *agg = (Agg *) aggstate->ss.ps.plan;
1401 1402 1403 1404 1405 1406
	FindColsContext context;

	context.is_aggref = false;
	context.aggregated = NULL;
	context.unaggregated = NULL;

1407
	/* Examine tlist and quals */
1408 1409 1410
	(void) find_cols_walker((Node *) agg->plan.targetlist, &context);
	(void) find_cols_walker((Node *) agg->plan.qual, &context);

1411 1412 1413 1414 1415
	/* In some cases, grouping columns will not appear in the tlist */
	for (int i = 0; i < agg->numCols; i++)
		context.unaggregated = bms_add_member(context.unaggregated,
											  agg->grpColIdx[i]);

1416 1417
	*aggregated = context.aggregated;
	*unaggregated = context.unaggregated;
1418 1419 1420
}

static bool
1421
find_cols_walker(Node *node, FindColsContext *context)
1422 1423 1424 1425 1426 1427 1428
{
	if (node == NULL)
		return false;
	if (IsA(node, Var))
	{
		Var		   *var = (Var *) node;

1429 1430
		/* setrefs.c should have set the varno to OUTER_VAR */
		Assert(var->varno == OUTER_VAR);
1431
		Assert(var->varlevelsup == 0);
1432 1433 1434 1435 1436 1437
		if (context->is_aggref)
			context->aggregated = bms_add_member(context->aggregated,
												 var->varattno);
		else
			context->unaggregated = bms_add_member(context->unaggregated,
												   var->varattno);
1438 1439
		return false;
	}
1440
	if (IsA(node, Aggref))
1441
	{
1442 1443 1444 1445
		Assert(!context->is_aggref);
		context->is_aggref = true;
		expression_tree_walker(node, find_cols_walker, (void *) context);
		context->is_aggref = false;
1446
		return false;
1447
	}
1448 1449
	return expression_tree_walker(node, find_cols_walker,
								  (void *) context);
1450 1451
}

1452
/*
1453
 * (Re-)initialize the hash table(s) to empty.
1454
 *
1455 1456 1457 1458 1459 1460
 * To implement hashed aggregation, we need a hashtable that stores a
 * representative tuple and an array of AggStatePerGroup structs for each
 * distinct set of GROUP BY column values.  We compute the hash key from the
 * GROUP BY columns.  The per-group data is allocated in lookup_hash_entry(),
 * for each entry.
 *
1461 1462 1463
 * We have a separate hashtable and associated perhash data structure for each
 * grouping set for which we're doing hashing.
 *
1464 1465 1466
 * The contents of the hash tables always live in the hashcontext's per-tuple
 * memory context (there is only one of these for all tables together, since
 * they are all reset at the same time).
1467 1468
 */
static void
Jeff Davis's avatar
Jeff Davis committed
1469
build_hash_tables(AggState *aggstate)
1470
{
1471
	int			setno;
1472

Jeff Davis's avatar
Jeff Davis committed
1473
	for (setno = 0; setno < aggstate->num_hashes; ++setno)
1474
	{
Jeff Davis's avatar
Jeff Davis committed
1475
		AggStatePerHash perhash = &aggstate->perhash[setno];
1476 1477
		long		nbuckets;
		Size		memory;
Jeff Davis's avatar
Jeff Davis committed
1478 1479 1480 1481 1482 1483

		if (perhash->hashtable != NULL)
		{
			ResetTupleHashTable(perhash->hashtable);
			continue;
		}
1484 1485 1486

		Assert(perhash->aggnode->numGroups > 0);

Jeff Davis's avatar
Jeff Davis committed
1487 1488 1489
		memory = aggstate->hash_mem_limit / aggstate->num_hashes;

		/* choose reasonable number of buckets per hashtable */
1490 1491 1492
		nbuckets = hash_choose_num_buckets(aggstate->hashentrysize,
										   perhash->aggnode->numGroups,
										   memory);
Jeff Davis's avatar
Jeff Davis committed
1493 1494

		build_hash_table(aggstate, setno, nbuckets);
1495
	}
Jeff Davis's avatar
Jeff Davis committed
1496 1497

	aggstate->hash_ngroups_current = 0;
1498
}
1499

Jeff Davis's avatar
Jeff Davis committed
1500 1501 1502 1503 1504 1505 1506
/*
 * Build a single hashtable for this grouping set.
 */
static void
build_hash_table(AggState *aggstate, int setno, long nbuckets)
{
	AggStatePerHash perhash = &aggstate->perhash[setno];
1507 1508 1509 1510
	MemoryContext metacxt = aggstate->hash_metacxt;
	MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
	MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory;
	Size		additionalsize;
Jeff Davis's avatar
Jeff Davis committed
1511 1512 1513 1514 1515 1516

	Assert(aggstate->aggstrategy == AGG_HASHED ||
		   aggstate->aggstrategy == AGG_MIXED);

	/*
	 * Used to make sure initial hash table allocation does not exceed
1517
	 * hash_mem. Note that the estimate does not include space for
Jeff Davis's avatar
Jeff Davis committed
1518 1519 1520 1521 1522
	 * pass-by-reference transition data values, nor for the representative
	 * tuple of each group.
	 */
	additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData);

1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535
	perhash->hashtable = BuildTupleHashTableExt(&aggstate->ss.ps,
												perhash->hashslot->tts_tupleDescriptor,
												perhash->numCols,
												perhash->hashGrpColIdxHash,
												perhash->eqfuncoids,
												perhash->hashfunctions,
												perhash->aggnode->grpCollations,
												nbuckets,
												additionalsize,
												metacxt,
												hashcxt,
												tmpcxt,
												DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
Jeff Davis's avatar
Jeff Davis committed
1536 1537
}

1538
/*
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553
 * Compute columns that actually need to be stored in hashtable entries.  The
 * incoming tuples from the child plan node will contain grouping columns,
 * other columns referenced in our targetlist and qual, columns used to
 * compute the aggregate functions, and perhaps just junk columns we don't use
 * at all.  Only columns of the first two types need to be stored in the
 * hashtable, and getting rid of the others can make the table entries
 * significantly smaller.  The hashtable only contains the relevant columns,
 * and is packed/unpacked in lookup_hash_entry() / agg_retrieve_hash_table()
 * into the format of the normal input descriptor.
 *
 * Additional columns, in addition to the columns grouped by, come from two
 * sources: Firstly functionally dependent columns that we don't need to group
 * by themselves, and secondly ctids for row-marks.
 *
 * To eliminate duplicates, we build a bitmapset of the needed columns, and
1554 1555 1556 1557 1558 1559 1560 1561
 * then build an array of the columns included in the hashtable. We might
 * still have duplicates if the passed-in grpColIdx has them, which can happen
 * in edge cases from semijoins/distinct; these can't always be removed,
 * because it's not certain that the duplicate cols will be using the same
 * hash function.
 *
 * Note that the array is preserved over ExecReScanAgg, so we allocate it in
 * the per-query context (unlike the hash table itself).
1562
 */
1563
static void
1564 1565
find_hash_columns(AggState *aggstate)
{
1566
	Bitmapset  *base_colnos;
1567 1568
	Bitmapset  *aggregated_colnos;
	TupleDesc	scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
1569
	List	   *outerTlist = outerPlanState(aggstate)->plan->targetlist;
1570
	int			numHashes = aggstate->num_hashes;
1571
	EState	   *estate = aggstate->ss.ps.state;
1572
	int			j;
1573

1574
	/* Find Vars that will be needed in tlist and qual */
1575 1576 1577 1578 1579 1580 1581
	find_cols(aggstate, &aggregated_colnos, &base_colnos);
	aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos);
	aggstate->max_colno_needed = 0;
	aggstate->all_cols_needed = true;

	for (int i = 0; i < scanDesc->natts; i++)
	{
1582 1583
		int			colno = i + 1;

1584 1585 1586 1587 1588
		if (bms_is_member(colno, aggstate->colnos_needed))
			aggstate->max_colno_needed = colno;
		else
			aggstate->all_cols_needed = false;
	}
1589

1590
	for (j = 0; j < numHashes; ++j)
1591
	{
1592 1593 1594 1595 1596
		AggStatePerHash perhash = &aggstate->perhash[j];
		Bitmapset  *colnos = bms_copy(base_colnos);
		AttrNumber *grpColIdx = perhash->aggnode->grpColIdx;
		List	   *hashTlist = NIL;
		TupleDesc	hashDesc;
1597
		int			maxCols;
1598
		int			i;
1599

1600
		perhash->largestGrpColIdx = 0;
1601

1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612
		/*
		 * If we're doing grouping sets, then some Vars might be referenced in
		 * tlist/qual for the benefit of other grouping sets, but not needed
		 * when hashing; i.e. prepare_projection_slot will null them out, so
		 * there'd be no point storing them.  Use prepare_projection_slot's
		 * logic to determine which.
		 */
		if (aggstate->phases[0].grouped_cols)
		{
			Bitmapset  *grouped_cols = aggstate->phases[0].grouped_cols[j];
			ListCell   *lc;
1613

1614 1615 1616
			foreach(lc, aggstate->all_grouped_cols)
			{
				int			attnum = lfirst_int(lc);
1617

1618 1619 1620 1621
				if (!bms_is_member(attnum, grouped_cols))
					colnos = bms_del_member(colnos, attnum);
			}
		}
1622 1623 1624 1625 1626 1627 1628 1629

		/*
		 * Compute maximum number of input columns accounting for possible
		 * duplications in the grpColIdx array, which can happen in some edge
		 * cases where HashAggregate was generated as part of a semijoin or a
		 * DISTINCT.
		 */
		maxCols = bms_num_members(colnos) + perhash->numCols;
1630

1631
		perhash->hashGrpColIdxInput =
1632
			palloc(maxCols * sizeof(AttrNumber));
1633 1634 1635
		perhash->hashGrpColIdxHash =
			palloc(perhash->numCols * sizeof(AttrNumber));

1636 1637 1638 1639
		/* Add all the grouping columns to colnos */
		for (i = 0; i < perhash->numCols; i++)
			colnos = bms_add_member(colnos, grpColIdx[i]);

1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654
		/*
		 * First build mapping for columns directly hashed. These are the
		 * first, because they'll be accessed when computing hash values and
		 * comparing tuples for exact matches. We also build simple mapping
		 * for execGrouping, so it knows where to find the to-be-hashed /
		 * compared columns in the input.
		 */
		for (i = 0; i < perhash->numCols; i++)
		{
			perhash->hashGrpColIdxInput[i] = grpColIdx[i];
			perhash->hashGrpColIdxHash[i] = i + 1;
			perhash->numhashGrpCols++;
			/* delete already mapped columns */
			bms_del_member(colnos, grpColIdx[i]);
		}
1655

1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672
		/* and add the remaining columns */
		while ((i = bms_first_member(colnos)) >= 0)
		{
			perhash->hashGrpColIdxInput[perhash->numhashGrpCols] = i;
			perhash->numhashGrpCols++;
		}

		/* and build a tuple descriptor for the hashtable */
		for (i = 0; i < perhash->numhashGrpCols; i++)
		{
			int			varNumber = perhash->hashGrpColIdxInput[i] - 1;

			hashTlist = lappend(hashTlist, list_nth(outerTlist, varNumber));
			perhash->largestGrpColIdx =
				Max(varNumber + 1, perhash->largestGrpColIdx);
		}

1673
		hashDesc = ExecTypeFromTL(hashTlist);
1674 1675 1676 1677 1678

		execTuplesHashPrepare(perhash->numCols,
							  perhash->aggnode->grpOperators,
							  &perhash->eqfuncoids,
							  &perhash->hashfunctions);
1679
		perhash->hashslot =
1680 1681
			ExecAllocTableSlot(&estate->es_tupleTable, hashDesc,
							   &TTSOpsMinimalTuple);
1682 1683 1684 1685 1686 1687

		list_free(hashTlist);
		bms_free(colnos);
	}

	bms_free(base_colnos);
1688 1689
}

1690
/*
1691
 * Estimate per-hash-table-entry overhead.
1692 1693
 */
Size
1694
hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace)
1695
{
1696 1697 1698 1699 1700 1701
	Size		tupleChunkSize;
	Size		pergroupChunkSize;
	Size		transitionChunkSize;
	Size		tupleSize = (MAXALIGN(SizeofMinimalTupleHeader) +
							 tupleWidth);
	Size		pergroupSize = numTrans * sizeof(AggStatePerGroupData);
1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714

	tupleChunkSize = CHUNKHDRSZ + tupleSize;

	if (pergroupSize > 0)
		pergroupChunkSize = CHUNKHDRSZ + pergroupSize;
	else
		pergroupChunkSize = 0;

	if (transitionSpace > 0)
		transitionChunkSize = CHUNKHDRSZ + transitionSpace;
	else
		transitionChunkSize = 0;

1715
	return
1716 1717 1718 1719
		sizeof(TupleHashEntryData) +
		tupleChunkSize +
		pergroupChunkSize +
		transitionChunkSize;
1720 1721
}

Jeff Davis's avatar
Jeff Davis committed
1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742
/*
 * hashagg_recompile_expressions()
 *
 * Identifies the right phase, compiles the right expression given the
 * arguments, and then sets phase->evalfunc to that expression.
 *
 * Different versions of the compiled expression are needed depending on
 * whether hash aggregation has spilled or not, and whether it's reading from
 * the outer plan or a tape. Before spilling to disk, the expression reads
 * from the outer plan and does not need to perform a NULL check. After
 * HashAgg begins to spill, new groups will not be created in the hash table,
 * and the AggStatePerGroup array may be NULL; therefore we need to add a null
 * pointer check to the expression. Then, when reading spilled data from a
 * tape, we change the outer slot type to be a fixed minimal tuple slot.
 *
 * It would be wasteful to recompile every time, so cache the compiled
 * expressions in the AggStatePerPhase, and reuse when appropriate.
 */
static void
hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
{
1743 1744 1745
	AggStatePerPhase phase;
	int			i = minslot ? 1 : 0;
	int			j = nullcheck ? 1 : 0;
Jeff Davis's avatar
Jeff Davis committed
1746 1747 1748 1749 1750 1751

	Assert(aggstate->aggstrategy == AGG_HASHED ||
		   aggstate->aggstrategy == AGG_MIXED);

	if (aggstate->aggstrategy == AGG_HASHED)
		phase = &aggstate->phases[0];
1752
	else						/* AGG_MIXED */
Jeff Davis's avatar
Jeff Davis committed
1753 1754 1755 1756
		phase = &aggstate->phases[1];

	if (phase->evaltrans_cache[i][j] == NULL)
	{
1757 1758 1759
		const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops;
		bool		outerfixed = aggstate->ss.ps.outeropsfixed;
		bool		dohash = true;
1760
		bool		dosort = false;
Jeff Davis's avatar
Jeff Davis committed
1761

1762 1763 1764 1765 1766 1767 1768
		/*
		 * If minslot is true, that means we are processing a spilled batch
		 * (inside agg_refill_hash_table()), and we must not advance the
		 * sorted grouping sets.
		 */
		if (aggstate->aggstrategy == AGG_MIXED && !minslot)
			dosort = true;
Jeff Davis's avatar
Jeff Davis committed
1769 1770 1771 1772 1773 1774 1775 1776

		/* temporarily change the outerops while compiling the expression */
		if (minslot)
		{
			aggstate->ss.ps.outerops = &TTSOpsMinimalTuple;
			aggstate->ss.ps.outeropsfixed = true;
		}

1777 1778 1779
		phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
														 dosort, dohash,
														 nullcheck);
Jeff Davis's avatar
Jeff Davis committed
1780 1781 1782 1783 1784 1785 1786 1787 1788 1789

		/* change back */
		aggstate->ss.ps.outerops = outerops;
		aggstate->ss.ps.outeropsfixed = outerfixed;
	}

	phase->evaltrans = phase->evaltrans_cache[i][j];
}

/*
1790
 * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
Jeff Davis's avatar
Jeff Davis committed
1791 1792 1793 1794 1795 1796 1797
 * number of partitions we expect to create (if we do spill).
 *
 * There are two limits: a memory limit, and also an ngroups limit. The
 * ngroups limit becomes important when we expect transition values to grow
 * substantially larger than the initial value.
 */
void
1798
hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
Jeff Davis's avatar
Jeff Davis committed
1799 1800 1801
					Size *mem_limit, uint64 *ngroups_limit,
					int *num_partitions)
{
1802 1803
	int			npartitions;
	Size		partition_mem;
1804
	Size		hash_mem_limit = get_hash_memory_limit();
Jeff Davis's avatar
Jeff Davis committed
1805

1806
	/* if not expected to spill, use all of hash_mem */
1807
	if (input_groups * hashentrysize <= hash_mem_limit)
Jeff Davis's avatar
Jeff Davis committed
1808
	{
1809 1810
		if (num_partitions != NULL)
			*num_partitions = 0;
1811 1812
		*mem_limit = hash_mem_limit;
		*ngroups_limit = hash_mem_limit / hashentrysize;
Jeff Davis's avatar
Jeff Davis committed
1813 1814 1815 1816 1817
		return;
	}

	/*
	 * Calculate expected memory requirements for spilling, which is the size
1818 1819
	 * of the buffers needed for all the tapes that need to be open at once.
	 * Then, subtract that from the memory available for holding hash tables.
Jeff Davis's avatar
Jeff Davis committed
1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832
	 */
	npartitions = hash_choose_num_partitions(input_groups,
											 hashentrysize,
											 used_bits,
											 NULL);
	if (num_partitions != NULL)
		*num_partitions = npartitions;

	partition_mem =
		HASHAGG_READ_BUFFER_SIZE +
		HASHAGG_WRITE_BUFFER_SIZE * npartitions;

	/*
1833
	 * Don't set the limit below 3/4 of hash_mem. In that case, we are at the
Jeff Davis's avatar
Jeff Davis committed
1834 1835 1836
	 * minimum number of partitions, so we aren't going to dramatically exceed
	 * work mem anyway.
	 */
1837 1838
	if (hash_mem_limit > 4 * partition_mem)
		*mem_limit = hash_mem_limit - partition_mem;
Jeff Davis's avatar
Jeff Davis committed
1839
	else
1840
		*mem_limit = hash_mem_limit * 0.75;
Jeff Davis's avatar
Jeff Davis committed
1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857

	if (*mem_limit > hashentrysize)
		*ngroups_limit = *mem_limit / hashentrysize;
	else
		*ngroups_limit = 1;
}

/*
 * hash_agg_check_limits
 *
 * After adding a new group to the hash table, check whether we need to enter
 * spill mode. Allocations may happen without adding new groups (for instance,
 * if the transition state size grows), so this check is imperfect.
 */
static void
hash_agg_check_limits(AggState *aggstate)
{
1858 1859 1860
	uint64		ngroups = aggstate->hash_ngroups_current;
	Size		meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt,
													 true);
1861 1862
	Size		hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory,
														true);
Jeff Davis's avatar
Jeff Davis committed
1863 1864 1865 1866 1867 1868

	/*
	 * Don't spill unless there's at least one group in the hash table so we
	 * can be sure to make progress even in edge cases.
	 */
	if (aggstate->hash_ngroups_current > 0 &&
1869
		(meta_mem + hashkey_mem > aggstate->hash_mem_limit ||
Jeff Davis's avatar
Jeff Davis committed
1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895
		 ngroups > aggstate->hash_ngroups_limit))
	{
		hash_agg_enter_spill_mode(aggstate);
	}
}

/*
 * Enter "spill mode", meaning that no new groups are added to any of the hash
 * tables. Tuples that would create a new group are instead spilled, and
 * processed later.
 */
static void
hash_agg_enter_spill_mode(AggState *aggstate)
{
	aggstate->hash_spill_mode = true;
	hashagg_recompile_expressions(aggstate, aggstate->table_filled, true);

	if (!aggstate->hash_ever_spilled)
	{
		Assert(aggstate->hash_tapeinfo == NULL);
		Assert(aggstate->hash_spills == NULL);

		aggstate->hash_ever_spilled = true;

		hashagg_tapeinfo_init(aggstate);

1896
		aggstate->hash_spills = palloc(sizeof(HashAggSpill) * aggstate->num_hashes);
Jeff Davis's avatar
Jeff Davis committed
1897 1898 1899

		for (int setno = 0; setno < aggstate->num_hashes; setno++)
		{
1900 1901
			AggStatePerHash perhash = &aggstate->perhash[setno];
			HashAggSpill *spill = &aggstate->hash_spills[setno];
Jeff Davis's avatar
Jeff Davis committed
1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918

			hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
							   perhash->aggnode->numGroups,
							   aggstate->hashentrysize);
		}
	}
}

/*
 * Update metrics after filling the hash table.
 *
 * If reading from the outer plan, from_tape should be false; if reading from
 * another tape, from_tape should be true.
 */
static void
hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
{
1919
	Size		meta_mem;
1920
	Size		hashkey_mem;
1921 1922
	Size		buffer_mem;
	Size		total_mem;
Jeff Davis's avatar
Jeff Davis committed
1923 1924 1925 1926 1927 1928 1929 1930 1931

	if (aggstate->aggstrategy != AGG_MIXED &&
		aggstate->aggstrategy != AGG_HASHED)
		return;

	/* memory for the hash table itself */
	meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);

	/* memory for the group keys and transition states */
1932
	hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true);
Jeff Davis's avatar
Jeff Davis committed
1933 1934 1935 1936 1937 1938 1939

	/* memory for read/write tape buffers, if spilled */
	buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
	if (from_tape)
		buffer_mem += HASHAGG_READ_BUFFER_SIZE;

	/* update peak mem */
1940
	total_mem = meta_mem + hashkey_mem + buffer_mem;
Jeff Davis's avatar
Jeff Davis committed
1941 1942 1943 1944 1945 1946
	if (total_mem > aggstate->hash_mem_peak)
		aggstate->hash_mem_peak = total_mem;

	/* update disk usage */
	if (aggstate->hash_tapeinfo != NULL)
	{
1947
		uint64		disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeinfo->tapeset) * (BLCKSZ / 1024);
Jeff Davis's avatar
Jeff Davis committed
1948 1949 1950 1951 1952

		if (aggstate->hash_disk_used < disk_used)
			aggstate->hash_disk_used = disk_used;
	}

1953
	/* update hashentrysize estimate based on contents */
Jeff Davis's avatar
Jeff Davis committed
1954 1955 1956
	if (aggstate->hash_ngroups_current > 0)
	{
		aggstate->hashentrysize =
1957
			sizeof(TupleHashEntryData) +
1958
			(hashkey_mem / (double) aggstate->hash_ngroups_current);
Jeff Davis's avatar
Jeff Davis committed
1959 1960 1961 1962 1963 1964 1965 1966 1967
	}
}

/*
 * Choose a reasonable number of buckets for the initial hash table size.
 */
static long
hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
{
1968 1969
	long		max_nbuckets;
	long		nbuckets = ngroups;
Jeff Davis's avatar
Jeff Davis committed
1970 1971 1972 1973

	max_nbuckets = memory / hashentrysize;

	/*
1974 1975
	 * Underestimating is better than overestimating. Too many buckets crowd
	 * out space for group keys and transition state values.
Jeff Davis's avatar
Jeff Davis committed
1976
	 */
1977
	max_nbuckets >>= 1;
Jeff Davis's avatar
Jeff Davis committed
1978 1979 1980

	if (nbuckets > max_nbuckets)
		nbuckets = max_nbuckets;
1981 1982

	return Max(nbuckets, 1);
Jeff Davis's avatar
Jeff Davis committed
1983 1984 1985 1986 1987 1988 1989 1990
}

/*
 * Determine the number of partitions to create when spilling, which will
 * always be a power of two. If log2_npartitions is non-NULL, set
 * *log2_npartitions to the log2() of the number of partitions.
 */
static int
1991
hash_choose_num_partitions(double input_groups, double hashentrysize,
Jeff Davis's avatar
Jeff Davis committed
1992 1993
						   int used_bits, int *log2_npartitions)
{
1994 1995 1996 1997
	Size		hash_mem_limit = get_hash_memory_limit();
	double		partition_limit;
	double		mem_wanted;
	double		dpartitions;
1998 1999
	int			npartitions;
	int			partition_bits;
Jeff Davis's avatar
Jeff Davis committed
2000 2001 2002

	/*
	 * Avoid creating so many partitions that the memory requirements of the
2003
	 * open partition files are greater than 1/4 of hash_mem.
Jeff Davis's avatar
Jeff Davis committed
2004 2005
	 */
	partition_limit =
2006
		(hash_mem_limit * 0.25 - HASHAGG_READ_BUFFER_SIZE) /
Jeff Davis's avatar
Jeff Davis committed
2007 2008 2009 2010 2011
		HASHAGG_WRITE_BUFFER_SIZE;

	mem_wanted = HASHAGG_PARTITION_FACTOR * input_groups * hashentrysize;

	/* make enough partitions so that each one is likely to fit in memory */
2012 2013 2014 2015
	dpartitions = 1 + (mem_wanted / hash_mem_limit);

	if (dpartitions > partition_limit)
		dpartitions = partition_limit;
Jeff Davis's avatar
Jeff Davis committed
2016

2017 2018 2019 2020
	if (dpartitions < HASHAGG_MIN_PARTITIONS)
		dpartitions = HASHAGG_MIN_PARTITIONS;
	if (dpartitions > HASHAGG_MAX_PARTITIONS)
		dpartitions = HASHAGG_MAX_PARTITIONS;
Jeff Davis's avatar
Jeff Davis committed
2021

2022 2023
	/* HASHAGG_MAX_PARTITIONS limit makes this safe */
	npartitions = (int) dpartitions;
Jeff Davis's avatar
Jeff Davis committed
2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035

	/* ceil(log2(npartitions)) */
	partition_bits = my_log2(npartitions);

	/* make sure that we don't exhaust the hash bits */
	if (partition_bits + used_bits >= 32)
		partition_bits = 32 - used_bits;

	if (log2_npartitions != NULL)
		*log2_npartitions = partition_bits;

	/* number of partitions will be a power of two */
2036
	npartitions = 1 << partition_bits;
Jeff Davis's avatar
Jeff Davis committed
2037 2038 2039 2040

	return npartitions;
}

2041
/*
2042
 * Initialize a freshly-created TupleHashEntry.
2043
 */
2044 2045 2046
static void
initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable,
					  TupleHashEntry entry)
2047
{
2048 2049
	AggStatePerGroup pergroup;
	int			transno;
Jeff Davis's avatar
Jeff Davis committed
2050

2051 2052
	aggstate->hash_ngroups_current++;
	hash_agg_check_limits(aggstate);
2053

2054 2055 2056
	/* no need to allocate or initialize per-group state */
	if (aggstate->numtrans == 0)
		return;
2057

2058 2059 2060
	pergroup = (AggStatePerGroup)
		MemoryContextAlloc(hashtable->tablecxt,
						   sizeof(AggStatePerGroupData) * aggstate->numtrans);
2061

2062
	entry->additional = pergroup;
2063

2064 2065 2066 2067 2068 2069 2070 2071
	/*
	 * Initialize aggregates for new tuple group, lookup_hash_entries()
	 * already has selected the relevant grouping set.
	 */
	for (transno = 0; transno < aggstate->numtrans; transno++)
	{
		AggStatePerTrans pertrans = &aggstate->pertrans[transno];
		AggStatePerGroup pergroupstate = &pergroup[transno];
2072

2073
		initialize_aggregate(aggstate, pertrans, pergroupstate);
2074 2075
	}
}
2076

2077
/*
2078
 * Look up hash entries for the current tuple in all hashed grouping sets.
2079 2080
 *
 * Be aware that lookup_hash_entry can reset the tmpcontext.
Jeff Davis's avatar
Jeff Davis committed
2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092
 *
 * Some entries may be left NULL if we are in "spill mode". The same tuple
 * will belong to different groups for each grouping set, so may match a group
 * already in memory for one set and match a group not in memory for another
 * set. When in "spill mode", the tuple will be spilled for each grouping set
 * where it doesn't match a group in memory.
 *
 * NB: It's possible to spill the same tuple for several different grouping
 * sets. This may seem wasteful, but it's actually a trade-off: if we spill
 * the tuple multiple times for multiple grouping sets, it can be partitioned
 * for each grouping set, making the refilling of the hash table very
 * efficient.
2093
 */
2094
static void
2095 2096 2097
lookup_hash_entries(AggState *aggstate)
{
	AggStatePerGroup *pergroup = aggstate->hash_pergroup;
2098
	TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple;
2099 2100
	int			setno;

Jeff Davis's avatar
Jeff Davis committed
2101
	for (setno = 0; setno < aggstate->num_hashes; setno++)
2102
	{
2103
		AggStatePerHash perhash = &aggstate->perhash[setno];
2104 2105 2106
		TupleHashTable hashtable = perhash->hashtable;
		TupleTableSlot *hashslot = perhash->hashslot;
		TupleHashEntry entry;
2107
		uint32		hash;
2108 2109 2110 2111 2112
		bool		isnew = false;
		bool	   *p_isnew;

		/* if hash table already spilled, don't create new entries */
		p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
Jeff Davis's avatar
Jeff Davis committed
2113

2114
		select_current_set(aggstate, setno, true);
2115 2116 2117
		prepare_hash_slot(perhash,
						  outerslot,
						  hashslot);
Jeff Davis's avatar
Jeff Davis committed
2118

2119 2120 2121 2122 2123 2124 2125 2126 2127 2128
		entry = LookupTupleHashEntry(hashtable, hashslot,
									 p_isnew, &hash);

		if (entry != NULL)
		{
			if (isnew)
				initialize_hash_entry(aggstate, hashtable, entry);
			pergroup[setno] = entry->additional;
		}
		else
Jeff Davis's avatar
Jeff Davis committed
2129
		{
2130 2131
			HashAggSpill *spill = &aggstate->hash_spills[setno];
			TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple;
Jeff Davis's avatar
Jeff Davis committed
2132 2133 2134 2135 2136 2137

			if (spill->partitions == NULL)
				hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0,
								   perhash->aggnode->numGroups,
								   aggstate->hashentrysize);

2138
			hashagg_spill_tuple(aggstate, spill, slot, hash);
2139
			pergroup[setno] = NULL;
Jeff Davis's avatar
Jeff Davis committed
2140
		}
2141 2142 2143
	}
}

2144
/*
2145 2146
 * ExecAgg -
 *
2147
 *	  ExecAgg receives tuples from its outer subplan and aggregates over
2148 2149
 *	  the appropriate attribute for each aggregate function use (Aggref
 *	  node) appearing in the targetlist or qual of the node.  The number
2150
 *	  of tuples to aggregate over depends on whether grouped or plain
Bruce Momjian's avatar
Bruce Momjian committed
2151
 *	  aggregation is selected.  In grouped aggregation, we produce a result
2152
 *	  row for each group; in plain aggregation there's a single result row
Bruce Momjian's avatar
Bruce Momjian committed
2153
 *	  for the whole query.  In either case, the value of each aggregate is
2154 2155
 *	  stored in the expression context to be used when ExecProject evaluates
 *	  the result tuple.
2156
 */
2157 2158
static TupleTableSlot *
ExecAgg(PlanState *pstate)
2159
{
2160
	AggState   *node = castNode(AggState, pstate);
2161
	TupleTableSlot *result = NULL;
2162

2163 2164
	CHECK_FOR_INTERRUPTS();

2165
	if (!node->agg_done)
2166
	{
2167
		/* Dispatch based on strategy */
2168
		switch (node->phase->aggstrategy)
2169 2170 2171 2172
		{
			case AGG_HASHED:
				if (!node->table_filled)
					agg_fill_hash_table(node);
2173 2174
				/* FALLTHROUGH */
			case AGG_MIXED:
2175 2176
				result = agg_retrieve_hash_table(node);
				break;
2177 2178
			case AGG_PLAIN:
			case AGG_SORTED:
2179 2180 2181 2182 2183 2184
				result = agg_retrieve_direct(node);
				break;
		}

		if (!TupIsNull(result))
			return result;
2185
	}
2186 2187

	return NULL;
2188 2189 2190 2191 2192 2193
}

/*
 * ExecAgg for non-hashed case
 */
static TupleTableSlot *
2194
agg_retrieve_direct(AggState *aggstate)
2195
{
2196
	Agg		   *node = aggstate->phase->aggnode;
2197
	ExprContext *econtext;
2198
	ExprContext *tmpcontext;
2199
	AggStatePerAgg peragg;
2200
	AggStatePerGroup *pergroups;
2201 2202
	TupleTableSlot *outerslot;
	TupleTableSlot *firstSlot;
2203 2204 2205 2206 2207 2208 2209
	TupleTableSlot *result;
	bool		hasGroupingSets = aggstate->phase->numsets > 0;
	int			numGroupingSets = Max(aggstate->phase->numsets, 1);
	int			currentSet;
	int			nextSetSize;
	int			numReset;
	int			i;
2210

2211 2212
	/*
	 * get state info from node
2213
	 *
2214 2215 2216
	 * econtext is the per-output-tuple expression context
	 *
	 * tmpcontext is the per-input-tuple expression context
2217
	 */
2218
	econtext = aggstate->ss.ps.ps_ExprContext;
2219
	tmpcontext = aggstate->tmpcontext;
2220

2221
	peragg = aggstate->peragg;
2222
	pergroups = aggstate->pergroups;
2223
	firstSlot = aggstate->ss.ss_ScanTupleSlot;
2224

2225
	/*
2226
	 * We loop retrieving groups until we find one matching
2227
	 * aggstate->ss.ps.qual
2228 2229 2230 2231 2232
	 *
	 * For grouping sets, we have the invariant that aggstate->projected_set
	 * is either -1 (initial call) or the index (starting from 0) in
	 * gset_lengths for the group we just completed (either by projecting a
	 * row or by discarding it in the qual).
2233
	 */
2234
	while (!aggstate->agg_done)
2235
	{
2236
		/*
2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251
		 * Clear the per-output-tuple context for each group, as well as
		 * aggcontext (which contains any pass-by-ref transvalues of the old
		 * group).  Some aggregate functions store working state in child
		 * contexts; those now get reset automatically without us needing to
		 * do anything special.
		 *
		 * We use ReScanExprContext not just ResetExprContext because we want
		 * any registered shutdown callbacks to be called.  That allows
		 * aggregate functions to ensure they've cleaned up any non-memory
		 * resources.
		 */
		ReScanExprContext(econtext);

		/*
		 * Determine how many grouping sets need to be reset at this boundary.
2252
		 */
2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276
		if (aggstate->projected_set >= 0 &&
			aggstate->projected_set < numGroupingSets)
			numReset = aggstate->projected_set + 1;
		else
			numReset = numGroupingSets;

		/*
		 * numReset can change on a phase boundary, but that's OK; we want to
		 * reset the contexts used in _this_ phase, and later, after possibly
		 * changing phase, initialize the right number of aggregates for the
		 * _new_ phase.
		 */

		for (i = 0; i < numReset; i++)
		{
			ReScanExprContext(aggstate->aggcontexts[i]);
		}

		/*
		 * Check if input is complete and there are no more groups to project
		 * in this phase; move to next phase or mark as done.
		 */
		if (aggstate->input_done == true &&
			aggstate->projected_set >= (numGroupingSets - 1))
2277
		{
2278
			if (aggstate->current_phase < aggstate->numphases - 1)
2279
			{
2280 2281 2282 2283 2284 2285
				initialize_phase(aggstate, aggstate->current_phase + 1);
				aggstate->input_done = false;
				aggstate->projected_set = -1;
				numGroupingSets = Max(aggstate->phase->numsets, 1);
				node = aggstate->phase->aggnode;
				numReset = numGroupingSets;
2286
			}
2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299
			else if (aggstate->aggstrategy == AGG_MIXED)
			{
				/*
				 * Mixed mode; we've output all the grouped stuff and have
				 * full hashtables, so switch to outputting those.
				 */
				initialize_phase(aggstate, 0);
				aggstate->table_filled = true;
				ResetTupleHashIterator(aggstate->perhash[0].hashtable,
									   &aggstate->perhash[0].hashiter);
				select_current_set(aggstate, 0, true);
				return agg_retrieve_hash_table(aggstate);
			}
2300 2301 2302
			else
			{
				aggstate->agg_done = true;
2303
				break;
2304 2305 2306
			}
		}

2307
		/*
2308 2309 2310
		 * Get the number of columns in the next grouping set after the last
		 * projected one (if any). This is the number of columns to compare to
		 * see if we reached the boundary of that set too.
2311
		 */
2312 2313 2314 2315 2316
		if (aggstate->projected_set >= 0 &&
			aggstate->projected_set < (numGroupingSets - 1))
			nextSetSize = aggstate->phase->gset_lengths[aggstate->projected_set + 1];
		else
			nextSetSize = 0;
2317

2318
		/*----------
2319 2320 2321
		 * If a subgroup for the current grouping set is present, project it.
		 *
		 * We have a new group if:
Bruce Momjian's avatar
Bruce Momjian committed
2322 2323
		 *	- we're out of input but haven't projected all grouping sets
		 *	  (checked above)
2324
		 * OR
Bruce Momjian's avatar
Bruce Momjian committed
2325 2326 2327 2328 2329 2330 2331 2332
		 *	  - we already projected a row that wasn't from the last grouping
		 *		set
		 *	  AND
		 *	  - the next grouping set has at least one grouping column (since
		 *		empty grouping sets project only once input is exhausted)
		 *	  AND
		 *	  - the previous and pending rows differ on the grouping columns
		 *		of the next grouping set
2333
		 *----------
2334
		 */
2335
		tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple;
2336
		if (aggstate->input_done ||
2337
			(node->aggstrategy != AGG_PLAIN &&
2338 2339 2340
			 aggstate->projected_set != -1 &&
			 aggstate->projected_set < (numGroupingSets - 1) &&
			 nextSetSize > 0 &&
2341 2342
			 !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1],
							   tmpcontext)))
2343 2344
		{
			aggstate->projected_set += 1;
2345

2346 2347 2348 2349
			Assert(aggstate->projected_set < numGroupingSets);
			Assert(nextSetSize > 0 || aggstate->input_done);
		}
		else
2350
		{
2351
			/*
2352 2353 2354
			 * We no longer care what group we just projected, the next
			 * projection will always be the first (or only) grouping set
			 * (unless the input proves to be empty).
2355
			 */
2356
			aggstate->projected_set = 0;
2357

2358
			/*
2359 2360
			 * If we don't already have the first tuple of the new group,
			 * fetch it from the outer plan.
2361
			 */
2362
			if (aggstate->grp_firstTuple == NULL)
2363
			{
2364 2365
				outerslot = fetch_input_tuple(aggstate);
				if (!TupIsNull(outerslot))
2366
				{
2367 2368 2369 2370
					/*
					 * Make a copy of the first input tuple; we will use this
					 * for comparisons (in group mode) and for projection.
					 */
2371
					aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2372
				}
2373
				else
2374
				{
2375 2376
					/* outer plan produced no tuples at all */
					if (hasGroupingSets)
2377 2378
					{
						/*
2379 2380 2381 2382 2383 2384 2385 2386
						 * If there was no input at all, we need to project
						 * rows only if there are grouping sets of size 0.
						 * Note that this implies that there can't be any
						 * references to ungrouped Vars, which would otherwise
						 * cause issues with the empty output slot.
						 *
						 * XXX: This is no longer true, we currently deal with
						 * this in finalize_aggregates().
2387
						 */
2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413
						aggstate->input_done = true;

						while (aggstate->phase->gset_lengths[aggstate->projected_set] > 0)
						{
							aggstate->projected_set += 1;
							if (aggstate->projected_set >= numGroupingSets)
							{
								/*
								 * We can't set agg_done here because we might
								 * have more phases to do, even though the
								 * input is empty. So we need to restart the
								 * whole outer loop.
								 */
								break;
							}
						}

						if (aggstate->projected_set >= numGroupingSets)
							continue;
					}
					else
					{
						aggstate->agg_done = true;
						/* If we are grouping, we should produce no tuples too */
						if (node->aggstrategy != AGG_PLAIN)
							return NULL;
2414
					}
2415
				}
2416 2417
			}

2418 2419 2420
			/*
			 * Initialize working state for a new input tuple group.
			 */
2421
			initialize_aggregates(aggstate, pergroups, numReset);
2422

2423
			if (aggstate->grp_firstTuple != NULL)
2424
			{
2425 2426 2427 2428 2429
				/*
				 * Store the copied first input tuple in the tuple table slot
				 * reserved for it.  The tuple will be deleted when it is
				 * cleared from the slot.
				 */
2430
				ExecForceStoreHeapTuple(aggstate->grp_firstTuple,
2431
										firstSlot, true);
Tom Lane's avatar
Tom Lane committed
2432
				aggstate->grp_firstTuple = NULL;	/* don't keep two pointers */
2433

2434 2435
				/* set up for first advance_aggregates call */
				tmpcontext->ecxt_outertuple = firstSlot;
2436

2437 2438 2439 2440 2441 2442
				/*
				 * Process each outer-plan tuple, and then fetch the next one,
				 * until we exhaust the outer plan or cross a group boundary.
				 */
				for (;;)
				{
2443 2444 2445 2446 2447 2448 2449
					/*
					 * During phase 1 only of a mixed agg, we need to update
					 * hashtables as well in advance_aggregates.
					 */
					if (aggstate->aggstrategy == AGG_MIXED &&
						aggstate->current_phase == 1)
					{
2450
						lookup_hash_entries(aggstate);
2451 2452
					}

2453 2454
					/* Advance the aggregates (or combine functions) */
					advance_aggregates(aggstate);
2455

2456 2457
					/* Reset per-input-tuple context after each tuple */
					ResetExprContext(tmpcontext);
2458

2459 2460 2461 2462
					outerslot = fetch_input_tuple(aggstate);
					if (TupIsNull(outerslot))
					{
						/* no more outer-plan tuples available */
Jeff Davis's avatar
Jeff Davis committed
2463 2464 2465 2466 2467 2468

						/* if we built hash tables, finalize any spills */
						if (aggstate->aggstrategy == AGG_MIXED &&
							aggstate->current_phase == 1)
							hashagg_finish_initial_spills(aggstate);

2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486
						if (hasGroupingSets)
						{
							aggstate->input_done = true;
							break;
						}
						else
						{
							aggstate->agg_done = true;
							break;
						}
					}
					/* set up for next advance_aggregates call */
					tmpcontext->ecxt_outertuple = outerslot;

					/*
					 * If we are grouping, check whether we've crossed a group
					 * boundary.
					 */
2487
					if (node->aggstrategy != AGG_PLAIN)
2488
					{
2489 2490 2491
						tmpcontext->ecxt_innertuple = firstSlot;
						if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1],
									  tmpcontext))
2492
						{
2493
							aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot);
2494 2495 2496 2497
							break;
						}
					}
				}
2498
			}
2499 2500 2501 2502 2503

			/*
			 * Use the representative input tuple for any references to
			 * non-aggregated input columns in aggregate direct args, the node
			 * qual, and the tlist.  (If we are not grouping, and there are no
Bruce Momjian's avatar
Bruce Momjian committed
2504 2505
			 * input rows at all, we will come here with an empty firstSlot
			 * ... but if not grouping, there can't be any references to
2506 2507 2508
			 * non-aggregated input columns, so no problem.)
			 */
			econtext->ecxt_outertuple = firstSlot;
2509
		}
2510 2511 2512 2513 2514 2515 2516

		Assert(aggstate->projected_set >= 0);

		currentSet = aggstate->projected_set;

		prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet);

2517 2518 2519 2520
		select_current_set(aggstate, currentSet, false);

		finalize_aggregates(aggstate,
							peragg,
2521
							pergroups[currentSet]);
2522 2523

		/*
Bruce Momjian's avatar
Bruce Momjian committed
2524 2525
		 * If there's no row to project right now, we must continue rather
		 * than returning a null since there might be more groups.
2526 2527 2528 2529
		 */
		result = project_aggregates(aggstate);
		if (result)
			return result;
2530 2531
	}

2532 2533
	/* No more groups */
	return NULL;
2534 2535 2536
}

/*
2537
 * ExecAgg for hashed case: read input and build hash table
2538 2539
 */
static void
2540
agg_fill_hash_table(AggState *aggstate)
2541 2542
{
	TupleTableSlot *outerslot;
2543
	ExprContext *tmpcontext = aggstate->tmpcontext;
2544 2545

	/*
2546 2547
	 * Process each outer-plan tuple, and then fetch the next one, until we
	 * exhaust the outer plan.
2548 2549 2550
	 */
	for (;;)
	{
2551
		outerslot = fetch_input_tuple(aggstate);
2552 2553
		if (TupIsNull(outerslot))
			break;
2554 2555

		/* set up for lookup_hash_entries and advance_aggregates */
2556
		tmpcontext->ecxt_outertuple = outerslot;
2557

2558
		/* Find or build hashtable entries */
2559
		lookup_hash_entries(aggstate);
2560

2561 2562
		/* Advance the aggregates (or combine functions) */
		advance_aggregates(aggstate);
2563

2564 2565 2566 2567 2568
		/*
		 * Reset per-input-tuple context after each tuple, but note that the
		 * hash lookups do this too
		 */
		ResetExprContext(aggstate->tmpcontext);
2569 2570
	}

Jeff Davis's avatar
Jeff Davis committed
2571 2572 2573
	/* finalize spills, if any */
	hashagg_finish_initial_spills(aggstate);

2574
	aggstate->table_filled = true;
2575 2576 2577 2578
	/* Initialize to walk the first hash table */
	select_current_set(aggstate, 0, true);
	ResetTupleHashIterator(aggstate->perhash[0].hashtable,
						   &aggstate->perhash[0].hashiter);
2579 2580
}

Jeff Davis's avatar
Jeff Davis committed
2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595
/*
 * If any data was spilled during hash aggregation, reset the hash table and
 * reprocess one batch of spilled data. After reprocessing a batch, the hash
 * table will again contain data, ready to be consumed by
 * agg_retrieve_hash_table_in_memory().
 *
 * Should only be called after all in memory hash table entries have been
 * finalized and emitted.
 *
 * Return false when input is exhausted and there's no more work to be done;
 * otherwise return true.
 */
static bool
agg_refill_hash_table(AggState *aggstate)
{
2596
	HashAggBatch *batch;
2597
	AggStatePerHash perhash;
2598 2599 2600
	HashAggSpill spill;
	HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
	bool		spill_initialized = false;
Jeff Davis's avatar
Jeff Davis committed
2601 2602 2603 2604 2605 2606 2607

	if (aggstate->hash_batches == NIL)
		return false;

	batch = linitial(aggstate->hash_batches);
	aggstate->hash_batches = list_delete_first(aggstate->hash_batches);

2608
	hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
Jeff Davis's avatar
Jeff Davis committed
2609 2610 2611
						batch->used_bits, &aggstate->hash_mem_limit,
						&aggstate->hash_ngroups_limit, NULL);

2612 2613 2614 2615 2616 2617 2618 2619 2620
	/*
	 * Each batch only processes one grouping set; set the rest to NULL so
	 * that advance_aggregates() knows to ignore them. We don't touch
	 * pergroups for sorted grouping sets here, because they will be needed if
	 * we rescan later. The expressions for sorted grouping sets will not be
	 * evaluated after we recompile anyway.
	 */
	MemSet(aggstate->hash_pergroup, 0,
		   sizeof(AggStatePerGroup) * aggstate->num_hashes);
Jeff Davis's avatar
Jeff Davis committed
2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642

	/* free memory and reset hash tables */
	ReScanExprContext(aggstate->hashcontext);
	for (int setno = 0; setno < aggstate->num_hashes; setno++)
		ResetTupleHashTable(aggstate->perhash[setno].hashtable);

	aggstate->hash_ngroups_current = 0;

	/*
	 * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output
	 * happens in phase 0. So, we switch to phase 1 when processing a batch,
	 * and back to phase 0 after the batch is done.
	 */
	Assert(aggstate->current_phase == 0);
	if (aggstate->phase->aggstrategy == AGG_MIXED)
	{
		aggstate->current_phase = 1;
		aggstate->phase = &aggstate->phases[aggstate->current_phase];
	}

	select_current_set(aggstate, batch->setno, true);

2643 2644
	perhash = &aggstate->perhash[aggstate->current_set];

Jeff Davis's avatar
Jeff Davis committed
2645 2646 2647 2648 2649 2650 2651 2652 2653
	/*
	 * Spilled tuples are always read back as MinimalTuples, which may be
	 * different from the outer plan, so recompile the aggregate expressions.
	 *
	 * We still need the NULL check, because we are only processing one
	 * grouping set at a time and the rest will be NULL.
	 */
	hashagg_recompile_expressions(aggstate, true, true);

2654 2655
	for (;;)
	{
2656 2657 2658
		TupleTableSlot *spillslot = aggstate->hash_spill_rslot;
		TupleTableSlot *hashslot = perhash->hashslot;
		TupleHashEntry entry;
2659 2660
		MinimalTuple tuple;
		uint32		hash;
2661 2662
		bool		isnew = false;
		bool	   *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew;
Jeff Davis's avatar
Jeff Davis committed
2663 2664 2665 2666 2667 2668 2669

		CHECK_FOR_INTERRUPTS();

		tuple = hashagg_batch_read(batch, &hash);
		if (tuple == NULL)
			break;

2670 2671
		ExecStoreMinimalTuple(tuple, spillslot, true);
		aggstate->tmpcontext->ecxt_outertuple = spillslot;
Jeff Davis's avatar
Jeff Davis committed
2672

2673 2674 2675 2676 2677
		prepare_hash_slot(perhash,
						  aggstate->tmpcontext->ecxt_outertuple,
						  hashslot);
		entry = LookupTupleHashEntryHash(
										 perhash->hashtable, hashslot, p_isnew, hash);
Jeff Davis's avatar
Jeff Davis committed
2678

2679
		if (entry != NULL)
Jeff Davis's avatar
Jeff Davis committed
2680
		{
2681 2682 2683
			if (isnew)
				initialize_hash_entry(aggstate, perhash->hashtable, entry);
			aggstate->hash_pergroup[batch->setno] = entry->additional;
Jeff Davis's avatar
Jeff Davis committed
2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695
			advance_aggregates(aggstate);
		}
		else
		{
			if (!spill_initialized)
			{
				/*
				 * Avoid initializing the spill until we actually need it so
				 * that we don't assign tapes that will never be used.
				 */
				spill_initialized = true;
				hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
2696
								   batch->input_card, aggstate->hashentrysize);
Jeff Davis's avatar
Jeff Davis committed
2697 2698
			}
			/* no memory for a new group, spill */
2699 2700 2701
			hashagg_spill_tuple(aggstate, &spill, spillslot, hash);

			aggstate->hash_pergroup[batch->setno] = NULL;
Jeff Davis's avatar
Jeff Davis committed
2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719
		}

		/*
		 * Reset per-input-tuple context after each tuple, but note that the
		 * hash lookups do this too
		 */
		ResetExprContext(aggstate->tmpcontext);
	}

	hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum);

	/* change back to phase 0 */
	aggstate->current_phase = 0;
	aggstate->phase = &aggstate->phases[aggstate->current_phase];

	if (spill_initialized)
	{
		hashagg_spill_finish(aggstate, &spill, batch->setno);
2720
		hash_agg_update_metrics(aggstate, true, spill.npartitions);
Jeff Davis's avatar
Jeff Davis committed
2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736
	}
	else
		hash_agg_update_metrics(aggstate, true, 0);

	aggstate->hash_spill_mode = false;

	/* prepare to walk the first hash table */
	select_current_set(aggstate, batch->setno, true);
	ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable,
						   &aggstate->perhash[batch->setno].hashiter);

	pfree(batch);

	return true;
}

2737
/*
2738
 * ExecAgg for hashed case: retrieving groups from hash table
Jeff Davis's avatar
Jeff Davis committed
2739 2740 2741 2742
 *
 * After exhausting in-memory tuples, also try refilling the hash table using
 * previously-spilled tuples. Only returns NULL after all in-memory and
 * spilled tuples are exhausted.
2743 2744
 */
static TupleTableSlot *
2745
agg_retrieve_hash_table(AggState *aggstate)
Jeff Davis's avatar
Jeff Davis committed
2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770
{
	TupleTableSlot *result = NULL;

	while (result == NULL)
	{
		result = agg_retrieve_hash_table_in_memory(aggstate);
		if (result == NULL)
		{
			if (!agg_refill_hash_table(aggstate))
			{
				aggstate->agg_done = true;
				break;
			}
		}
	}

	return result;
}

/*
 * Retrieve the groups from the in-memory hash tables without considering any
 * spilled tuples.
 */
static TupleTableSlot *
agg_retrieve_hash_table_in_memory(AggState *aggstate)
2771 2772 2773 2774
{
	ExprContext *econtext;
	AggStatePerAgg peragg;
	AggStatePerGroup pergroup;
2775
	TupleHashEntryData *entry;
2776
	TupleTableSlot *firstSlot;
2777
	TupleTableSlot *result;
2778
	AggStatePerHash perhash;
2779 2780

	/*
2781 2782 2783
	 * get state info from node.
	 *
	 * econtext is the per-output-tuple expression context.
2784
	 */
2785
	econtext = aggstate->ss.ps.ps_ExprContext;
2786
	peragg = aggstate->peragg;
2787
	firstSlot = aggstate->ss.ss_ScanTupleSlot;
2788

2789 2790 2791 2792 2793
	/*
	 * Note that perhash (and therefore anything accessed through it) can
	 * change inside the loop, as we change between grouping sets.
	 */
	perhash = &aggstate->perhash[aggstate->current_set];
2794 2795

	/*
2796 2797
	 * We loop retrieving groups until we find one satisfying
	 * aggstate->ss.ps.qual
2798
	 */
Jeff Davis's avatar
Jeff Davis committed
2799
	for (;;)
2800
	{
2801 2802
		TupleTableSlot *hashslot = perhash->hashslot;
		int			i;
2803

2804 2805
		CHECK_FOR_INTERRUPTS();

2806 2807 2808
		/*
		 * Find the next entry in the hash table
		 */
2809
		entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter);
2810
		if (entry == NULL)
2811
		{
2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831
			int			nextset = aggstate->current_set + 1;

			if (nextset < aggstate->num_hashes)
			{
				/*
				 * Switch to next grouping set, reinitialize, and restart the
				 * loop.
				 */
				select_current_set(aggstate, nextset, true);

				perhash = &aggstate->perhash[aggstate->current_set];

				ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter);

				continue;
			}
			else
			{
				return NULL;
			}
2832 2833 2834 2835
		}

		/*
		 * Clear the per-output-tuple context for each group
2836 2837 2838 2839
		 *
		 * We intentionally don't use ReScanExprContext here; if any aggs have
		 * registered shutdown callbacks, they mustn't be called yet, since we
		 * might not be done with that agg.
2840 2841 2842 2843
		 */
		ResetExprContext(econtext);

		/*
2844 2845
		 * Transform representative tuple back into one with the right
		 * columns.
2846
		 */
2847 2848 2849 2850 2851 2852 2853
		ExecStoreMinimalTuple(entry->firstTuple, hashslot, false);
		slot_getallattrs(hashslot);

		ExecClearTuple(firstSlot);
		memset(firstSlot->tts_isnull, true,
			   firstSlot->tts_tupleDescriptor->natts * sizeof(bool));

2854
		for (i = 0; i < perhash->numhashGrpCols; i++)
2855
		{
2856
			int			varNumber = perhash->hashGrpColIdxInput[i] - 1;
2857 2858 2859 2860 2861

			firstSlot->tts_values[varNumber] = hashslot->tts_values[i];
			firstSlot->tts_isnull[varNumber] = hashslot->tts_isnull[i];
		}
		ExecStoreVirtualTuple(firstSlot);
2862

2863
		pergroup = (AggStatePerGroup) entry->additional;
2864

2865
		/*
2866 2867
		 * Use the representative input tuple for any references to
		 * non-aggregated input columns in the qual and tlist.
2868
		 */
2869
		econtext->ecxt_outertuple = firstSlot;
2870

2871 2872 2873 2874 2875 2876
		prepare_projection_slot(aggstate,
								econtext->ecxt_outertuple,
								aggstate->current_set);

		finalize_aggregates(aggstate, peragg, pergroup);

2877 2878 2879
		result = project_aggregates(aggstate);
		if (result)
			return result;
2880
	}
2881

2882 2883
	/* No more groups */
	return NULL;
2884 2885
}

Jeff Davis's avatar
Jeff Davis committed
2886 2887 2888 2889 2890 2891
/*
 * Initialize HashTapeInfo
 */
static void
hashagg_tapeinfo_init(AggState *aggstate)
{
2892 2893
	HashTapeInfo *tapeinfo = palloc(sizeof(HashTapeInfo));
	int			init_tapes = 16;	/* expanded dynamically */
Jeff Davis's avatar
Jeff Davis committed
2894

2895
	tapeinfo->tapeset = LogicalTapeSetCreate(init_tapes, true, NULL, NULL, -1);
Jeff Davis's avatar
Jeff Davis committed
2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913
	tapeinfo->ntapes = init_tapes;
	tapeinfo->nfreetapes = init_tapes;
	tapeinfo->freetapes_alloc = init_tapes;
	tapeinfo->freetapes = palloc(init_tapes * sizeof(int));
	for (int i = 0; i < init_tapes; i++)
		tapeinfo->freetapes[i] = i;

	aggstate->hash_tapeinfo = tapeinfo;
}

/*
 * Assign unused tapes to spill partitions, extending the tape set if
 * necessary.
 */
static void
hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *partitions,
						int npartitions)
{
2914
	int			partidx = 0;
Jeff Davis's avatar
Jeff Davis committed
2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935

	/* use free tapes if available */
	while (partidx < npartitions && tapeinfo->nfreetapes > 0)
		partitions[partidx++] = tapeinfo->freetapes[--tapeinfo->nfreetapes];

	if (partidx < npartitions)
	{
		LogicalTapeSetExtend(tapeinfo->tapeset, npartitions - partidx);

		while (partidx < npartitions)
			partitions[partidx++] = tapeinfo->ntapes++;
	}
}

/*
 * After a tape has already been written to and then read, this function
 * rewinds it for writing and adds it to the free list.
 */
static void
hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
{
2936
	/* rewinding frees the buffer while not in use */
Jeff Davis's avatar
Jeff Davis committed
2937 2938 2939 2940
	LogicalTapeRewindForWrite(tapeinfo->tapeset, tapenum);
	if (tapeinfo->freetapes_alloc == tapeinfo->nfreetapes)
	{
		tapeinfo->freetapes_alloc <<= 1;
2941 2942
		tapeinfo->freetapes = repalloc(tapeinfo->freetapes,
									   tapeinfo->freetapes_alloc * sizeof(int));
Jeff Davis's avatar
Jeff Davis committed
2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954
	}
	tapeinfo->freetapes[tapeinfo->nfreetapes++] = tapenum;
}

/*
 * hashagg_spill_init
 *
 * Called after we determined that spilling is necessary. Chooses the number
 * of partitions to create, and initializes them.
 */
static void
hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
2955
				   double input_groups, double hashentrysize)
Jeff Davis's avatar
Jeff Davis committed
2956
{
2957 2958
	int			npartitions;
	int			partition_bits;
Jeff Davis's avatar
Jeff Davis committed
2959

2960 2961
	npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
											 used_bits, &partition_bits);
Jeff Davis's avatar
Jeff Davis committed
2962 2963 2964

	spill->partitions = palloc0(sizeof(int) * npartitions);
	spill->ntuples = palloc0(sizeof(int64) * npartitions);
2965
	spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
Jeff Davis's avatar
Jeff Davis committed
2966 2967 2968 2969 2970 2971 2972

	hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);

	spill->tapeset = tapeinfo->tapeset;
	spill->shift = 32 - used_bits - partition_bits;
	spill->mask = (npartitions - 1) << spill->shift;
	spill->npartitions = npartitions;
2973 2974 2975

	for (int i = 0; i < npartitions; i++)
		initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH);
Jeff Davis's avatar
Jeff Davis committed
2976 2977 2978 2979 2980 2981 2982 2983 2984
}

/*
 * hashagg_spill_tuple
 *
 * No room for new groups in the hash table. Save for later in the appropriate
 * partition.
 */
static Size
2985 2986
hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
					TupleTableSlot *inputslot, uint32 hash)
Jeff Davis's avatar
Jeff Davis committed
2987
{
2988
	LogicalTapeSet *tapeset = spill->tapeset;
2989
	TupleTableSlot *spillslot;
2990 2991 2992 2993 2994
	int			partition;
	MinimalTuple tuple;
	int			tapenum;
	int			total_written = 0;
	bool		shouldFree;
Jeff Davis's avatar
Jeff Davis committed
2995 2996 2997

	Assert(spill->partitions != NULL);

2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019
	/* spill only attributes that we actually need */
	if (!aggstate->all_cols_needed)
	{
		spillslot = aggstate->hash_spill_wslot;
		slot_getsomeattrs(inputslot, aggstate->max_colno_needed);
		ExecClearTuple(spillslot);
		for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++)
		{
			if (bms_is_member(i + 1, aggstate->colnos_needed))
			{
				spillslot->tts_values[i] = inputslot->tts_values[i];
				spillslot->tts_isnull[i] = inputslot->tts_isnull[i];
			}
			else
				spillslot->tts_isnull[i] = true;
		}
		ExecStoreVirtualTuple(spillslot);
	}
	else
		spillslot = inputslot;

	tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree);
Jeff Davis's avatar
Jeff Davis committed
3020 3021 3022 3023

	partition = (hash & spill->mask) >> spill->shift;
	spill->ntuples[partition]++;

3024 3025 3026 3027 3028 3029 3030
	/*
	 * All hash values destined for a given partition have some bits in
	 * common, which causes bad HLL cardinality estimates. Hash the hash to
	 * get a more uniform distribution.
	 */
	addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));

Jeff Davis's avatar
Jeff Davis committed
3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052
	tapenum = spill->partitions[partition];

	LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
	total_written += sizeof(uint32);

	LogicalTapeWrite(tapeset, tapenum, (void *) tuple, tuple->t_len);
	total_written += tuple->t_len;

	if (shouldFree)
		pfree(tuple);

	return total_written;
}

/*
 * hashagg_batch_new
 *
 * Construct a HashAggBatch item, which represents one iteration of HashAgg to
 * be done.
 */
static HashAggBatch *
hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
3053
				  int64 input_tuples, double input_card, int used_bits)
Jeff Davis's avatar
Jeff Davis committed
3054 3055 3056 3057 3058 3059 3060 3061
{
	HashAggBatch *batch = palloc0(sizeof(HashAggBatch));

	batch->setno = setno;
	batch->used_bits = used_bits;
	batch->tapeset = tapeset;
	batch->input_tapenum = tapenum;
	batch->input_tuples = input_tuples;
3062
	batch->input_card = input_card;
Jeff Davis's avatar
Jeff Davis committed
3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074

	return batch;
}

/*
 * read_spilled_tuple
 * 		read the next tuple from a batch's tape.  Return NULL if no more.
 */
static MinimalTuple
hashagg_batch_read(HashAggBatch *batch, uint32 *hashp)
{
	LogicalTapeSet *tapeset = batch->tapeset;
3075 3076 3077 3078 3079
	int			tapenum = batch->input_tapenum;
	MinimalTuple tuple;
	uint32		t_len;
	size_t		nread;
	uint32		hash;
Jeff Davis's avatar
Jeff Davis committed
3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102

	nread = LogicalTapeRead(tapeset, tapenum, &hash, sizeof(uint32));
	if (nread == 0)
		return NULL;
	if (nread != sizeof(uint32))
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
						tapenum, sizeof(uint32), nread)));
	if (hashp != NULL)
		*hashp = hash;

	nread = LogicalTapeRead(tapeset, tapenum, &t_len, sizeof(t_len));
	if (nread != sizeof(uint32))
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
						tapenum, sizeof(uint32), nread)));

	tuple = (MinimalTuple) palloc(t_len);
	tuple->t_len = t_len;

	nread = LogicalTapeRead(tapeset, tapenum,
3103
							(void *) ((char *) tuple + sizeof(uint32)),
Jeff Davis's avatar
Jeff Davis committed
3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123
							t_len - sizeof(uint32));
	if (nread != t_len - sizeof(uint32))
		ereport(ERROR,
				(errcode_for_file_access(),
				 errmsg("unexpected EOF for tape %d: requested %zu bytes, read %zu bytes",
						tapenum, t_len - sizeof(uint32), nread)));

	return tuple;
}

/*
 * hashagg_finish_initial_spills
 *
 * After a HashAggBatch has been processed, it may have spilled tuples to
 * disk. If so, turn the spilled partitions into new batches that must later
 * be executed.
 */
static void
hashagg_finish_initial_spills(AggState *aggstate)
{
3124 3125
	int			setno;
	int			total_npartitions = 0;
Jeff Davis's avatar
Jeff Davis committed
3126 3127 3128 3129 3130 3131

	if (aggstate->hash_spills != NULL)
	{
		for (setno = 0; setno < aggstate->num_hashes; setno++)
		{
			HashAggSpill *spill = &aggstate->hash_spills[setno];
3132

Jeff Davis's avatar
Jeff Davis committed
3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157
			total_npartitions += spill->npartitions;
			hashagg_spill_finish(aggstate, spill, setno);
		}

		/*
		 * We're not processing tuples from outer plan any more; only
		 * processing batches of spilled tuples. The initial spill structures
		 * are no longer needed.
		 */
		pfree(aggstate->hash_spills);
		aggstate->hash_spills = NULL;
	}

	hash_agg_update_metrics(aggstate, false, total_npartitions);
	aggstate->hash_spill_mode = false;
}

/*
 * hashagg_spill_finish
 *
 * Transform spill partitions into new batches.
 */
static void
hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
{
3158 3159
	int			i;
	int			used_bits = 32 - spill->shift;
Jeff Davis's avatar
Jeff Davis committed
3160 3161

	if (spill->npartitions == 0)
3162
		return;					/* didn't spill */
Jeff Davis's avatar
Jeff Davis committed
3163 3164 3165

	for (i = 0; i < spill->npartitions; i++)
	{
3166 3167 3168 3169
		LogicalTapeSet *tapeset = aggstate->hash_tapeinfo->tapeset;
		int			tapenum = spill->partitions[i];
		HashAggBatch *new_batch;
		double		cardinality;
Jeff Davis's avatar
Jeff Davis committed
3170 3171 3172 3173 3174

		/* if the partition is empty, don't create a new batch of work */
		if (spill->ntuples[i] == 0)
			continue;

3175 3176 3177
		cardinality = estimateHyperLogLog(&spill->hll_card[i]);
		freeHyperLogLog(&spill->hll_card[i]);

3178 3179 3180 3181 3182 3183 3184
		/* rewinding frees the buffer while not in use */
		LogicalTapeRewindForRead(tapeset, tapenum,
								 HASHAGG_READ_BUFFER_SIZE);

		new_batch = hashagg_batch_new(tapeset, tapenum, setno,
									  spill->ntuples[i], cardinality,
									  used_bits);
Jeff Davis's avatar
Jeff Davis committed
3185 3186 3187 3188 3189
		aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
		aggstate->hash_batches_used++;
	}

	pfree(spill->ntuples);
3190
	pfree(spill->hll_card);
Jeff Davis's avatar
Jeff Davis committed
3191 3192 3193 3194 3195 3196 3197 3198 3199
	pfree(spill->partitions);
}

/*
 * Free resources related to a spilled HashAgg.
 */
static void
hashagg_reset_spill_state(AggState *aggstate)
{
3200
	ListCell   *lc;
Jeff Davis's avatar
Jeff Davis committed
3201 3202 3203 3204

	/* free spills from initial pass */
	if (aggstate->hash_spills != NULL)
	{
3205
		int			setno;
Jeff Davis's avatar
Jeff Davis committed
3206 3207 3208 3209

		for (setno = 0; setno < aggstate->num_hashes; setno++)
		{
			HashAggSpill *spill = &aggstate->hash_spills[setno];
3210

Jeff Davis's avatar
Jeff Davis committed
3211 3212 3213 3214 3215 3216 3217 3218 3219 3220
			pfree(spill->ntuples);
			pfree(spill->partitions);
		}
		pfree(aggstate->hash_spills);
		aggstate->hash_spills = NULL;
	}

	/* free batches */
	foreach(lc, aggstate->hash_batches)
	{
3221 3222
		HashAggBatch *batch = (HashAggBatch *) lfirst(lc);

Jeff Davis's avatar
Jeff Davis committed
3223 3224 3225 3226 3227 3228 3229 3230
		pfree(batch);
	}
	list_free(aggstate->hash_batches);
	aggstate->hash_batches = NIL;

	/* close tape set */
	if (aggstate->hash_tapeinfo != NULL)
	{
3231
		HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
Jeff Davis's avatar
Jeff Davis committed
3232 3233 3234 3235 3236 3237 3238 3239 3240

		LogicalTapeSetClose(tapeinfo->tapeset);
		pfree(tapeinfo->freetapes);
		pfree(tapeinfo);
		aggstate->hash_tapeinfo = NULL;
	}
}


3241 3242 3243
/* -----------------
 * ExecInitAgg
 *
3244
 *	Creates the run-time information for the agg node produced by the
3245 3246
 *	planner and initializes its outer subtree.
 *
3247 3248
 * -----------------
 */
3249
AggState *
3250
ExecInitAgg(Agg *node, EState *estate, int eflags)
3251
{
3252
	AggState   *aggstate;
3253 3254
	AggStatePerAgg peraggs;
	AggStatePerTrans pertransstates;
3255
	AggStatePerGroup *pergroups;
3256 3257
	Plan	   *outerPlan;
	ExprContext *econtext;
3258
	TupleDesc	scanDesc;
3259 3260 3261 3262 3263
	int			max_aggno;
	int			max_transno;
	int			numaggrefs;
	int			numaggs;
	int			numtrans;
3264
	int			phase;
3265
	int			phaseidx;
3266
	ListCell   *l;
3267 3268 3269
	Bitmapset  *all_grouped_cols = NULL;
	int			numGroupingSets = 1;
	int			numPhases;
3270
	int			numHashes;
3271 3272
	int			i = 0;
	int			j = 0;
3273 3274
	bool		use_hashing = (node->aggstrategy == AGG_HASHED ||
							   node->aggstrategy == AGG_MIXED);
Bruce Momjian's avatar
Bruce Momjian committed
3275

3276 3277 3278
	/* check for unsupported flags */
	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));

3279 3280 3281 3282
	/*
	 * create state structure
	 */
	aggstate = makeNode(AggState);
3283 3284
	aggstate->ss.ps.plan = (Plan *) node;
	aggstate->ss.ps.state = estate;
3285
	aggstate->ss.ps.ExecProcNode = ExecAgg;
3286 3287 3288

	aggstate->aggs = NIL;
	aggstate->numaggs = 0;
3289
	aggstate->numtrans = 0;
3290
	aggstate->aggstrategy = node->aggstrategy;
3291
	aggstate->aggsplit = node->aggsplit;
3292 3293 3294
	aggstate->maxsets = 0;
	aggstate->projected_set = -1;
	aggstate->current_set = 0;
3295
	aggstate->peragg = NULL;
3296
	aggstate->pertrans = NULL;
3297
	aggstate->curperagg = NULL;
3298
	aggstate->curpertrans = NULL;
3299
	aggstate->input_done = false;
3300
	aggstate->agg_done = false;
3301
	aggstate->pergroups = NULL;
3302
	aggstate->grp_firstTuple = NULL;
3303 3304
	aggstate->sort_in = NULL;
	aggstate->sort_out = NULL;
3305

3306 3307 3308 3309 3310 3311
	/*
	 * phases[0] always exists, but is dummy in sorted/plain mode
	 */
	numPhases = (use_hashing ? 1 : 2);
	numHashes = (use_hashing ? 1 : 0);

3312
	/*
3313
	 * Calculate the maximum number of grouping sets in any phase; this
3314 3315
	 * determines the size of some allocations.  Also calculate the number of
	 * phases, since all hashed/mixed nodes contribute to only a single phase.
3316
	 */
3317 3318 3319 3320 3321 3322
	if (node->groupingSets)
	{
		numGroupingSets = list_length(node->groupingSets);

		foreach(l, node->chain)
		{
Bruce Momjian's avatar
Bruce Momjian committed
3323
			Agg		   *agg = lfirst(l);
3324 3325 3326

			numGroupingSets = Max(numGroupingSets,
								  list_length(agg->groupingSets));
3327 3328 3329 3330 3331 3332 3333 3334 3335

			/*
			 * additional AGG_HASHED aggs become part of phase 0, but all
			 * others add an extra phase.
			 */
			if (agg->aggstrategy != AGG_HASHED)
				++numPhases;
			else
				++numHashes;
3336 3337 3338 3339
		}
	}

	aggstate->maxsets = numGroupingSets;
3340
	aggstate->numphases = numPhases;
3341 3342 3343

	aggstate->aggcontexts = (ExprContext **)
		palloc0(sizeof(ExprContext *) * numGroupingSets);
3344

3345
	/*
3346
	 * Create expression contexts.  We need three or more, one for
3347 3348 3349 3350 3351 3352
	 * per-input-tuple processing, one for per-output-tuple processing, one
	 * for all the hashtables, and one for each grouping set.  The per-tuple
	 * memory context of the per-grouping-set ExprContexts (aggcontexts)
	 * replaces the standalone memory context formerly used to hold transition
	 * values.  We cheat a little by using ExecAssignExprContext() to build
	 * all of them.
3353 3354 3355 3356 3357
	 *
	 * NOTE: the details of what is stored in aggcontexts and what is stored
	 * in the regular per-query memory context are driven by a simple
	 * decision: we want to reset the aggcontext at group boundaries (if not
	 * hashing) and in ExecReScanAgg to recover no-longer-wanted space.
3358
	 */
3359 3360 3361 3362 3363 3364 3365 3366 3367
	ExecAssignExprContext(estate, &aggstate->ss.ps);
	aggstate->tmpcontext = aggstate->ss.ps.ps_ExprContext;

	for (i = 0; i < numGroupingSets; ++i)
	{
		ExecAssignExprContext(estate, &aggstate->ss.ps);
		aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
	}

3368
	if (use_hashing)
3369
		aggstate->hashcontext = CreateWorkExprContext(estate);
3370

3371
	ExecAssignExprContext(estate, &aggstate->ss.ps);
3372

3373
	/*
3374
	 * Initialize child nodes.
3375
	 *
Bruce Momjian's avatar
Bruce Momjian committed
3376 3377
	 * If we are doing a hashed aggregation then the child plan does not need
	 * to handle REWIND efficiently; see ExecReScanAgg.
3378
	 */
3379 3380
	if (node->aggstrategy == AGG_HASHED)
		eflags &= ~EXEC_FLAG_REWIND;
3381
	outerPlan = outerPlan(node);
3382
	outerPlanState(aggstate) = ExecInitNode(outerPlan, estate, eflags);
3383 3384 3385 3386

	/*
	 * initialize source tuple type.
	 */
3387 3388 3389 3390 3391
	aggstate->ss.ps.outerops =
		ExecGetResultSlotOps(outerPlanState(&aggstate->ss),
							 &aggstate->ss.ps.outeropsfixed);
	aggstate->ss.ps.outeropsset = true;

3392
	ExecCreateScanSlotFromOuterPlan(estate, &aggstate->ss,
3393
									aggstate->ss.ps.outerops);
3394
	scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
3395 3396 3397 3398 3399 3400 3401

	/*
	 * If there are more than two phases (including a potential dummy phase
	 * 0), input will be resorted using tuplesort. Need a slot for that.
	 */
	if (numPhases > 2)
	{
3402 3403
		aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc,
													 &TTSOpsMinimalTuple);
3404

3405 3406 3407 3408
		/*
		 * The output of the tuplesort, and the output from the outer child
		 * might not use the same type of slot. In most cases the child will
		 * be a Sort, and thus return a TTSOpsMinimalTuple type slot - but the
3409 3410
		 * input can also be presorted due an index, in which case it could be
		 * a different type of slot.
3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423
		 *
		 * XXX: For efficiency it would be good to instead/additionally
		 * generate expressions with corresponding settings of outerops* for
		 * the individual phases - deforming is often a bottleneck for
		 * aggregations with lots of rows per group. If there's multiple
		 * sorts, we know that all but the first use TTSOpsMinimalTuple (via
		 * the nodeAgg.c internal tuplesort).
		 */
		if (aggstate->ss.ps.outeropsfixed &&
			aggstate->ss.ps.outerops != &TTSOpsMinimalTuple)
			aggstate->ss.ps.outeropsfixed = false;
	}

3424
	/*
3425
	 * Initialize result type, slot and projection.
3426
	 */
3427
	ExecInitResultTupleSlotTL(&aggstate->ss.ps, &TTSOpsVirtual);
3428
	ExecAssignProjectionInfo(&aggstate->ss.ps, NULL);
3429

3430 3431 3432 3433 3434 3435 3436 3437 3438
	/*
	 * initialize child expressions
	 *
	 * We expect the parser to have checked that no aggs contain other agg
	 * calls in their arguments (and just to be sure, we verify it again while
	 * initializing the plan node).  This would make no sense under SQL
	 * semantics, and it's forbidden by the spec.  Because it is true, we
	 * don't need to worry about evaluating the aggs in any particular order.
	 *
3439 3440 3441
	 * Note: execExpr.c finds Aggrefs for us, and adds them to aggstate->aggs.
	 * Aggrefs in the qual are found here; Aggrefs in the targetlist are found
	 * during ExecAssignProjectionInfo, above.
3442 3443 3444 3445
	 */
	aggstate->ss.ps.qual =
		ExecInitQual(node->plan.qual, (PlanState *) aggstate);

3446
	/*
3447
	 * We should now have found all Aggrefs in the targetlist and quals.
3448
	 */
3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460
	numaggrefs = list_length(aggstate->aggs);
	max_aggno = -1;
	max_transno = -1;
	foreach(l, aggstate->aggs)
	{
		Aggref	   *aggref = (Aggref *) lfirst(l);

		max_aggno = Max(max_aggno, aggref->aggno);
		max_transno = Max(max_transno, aggref->aggtransno);
	}
	numaggs = max_aggno + 1;
	numtrans = max_transno + 1;
3461

3462
	/*
3463 3464
	 * For each phase, prepare grouping set data and fmgr lookup data for
	 * compare functions.  Accumulate all_grouped_cols in passing.
3465
	 */
3466 3467
	aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));

3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478
	aggstate->num_hashes = numHashes;
	if (numHashes)
	{
		aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
		aggstate->phases[0].numsets = 0;
		aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
		aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
	}

	phase = 0;
	for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx)
3479
	{
Bruce Momjian's avatar
Bruce Momjian committed
3480 3481
		Agg		   *aggnode;
		Sort	   *sortnode;
3482

3483
		if (phaseidx > 0)
3484
		{
3485
			aggnode = list_nth_node(Agg, node->chain, phaseidx - 1);
3486
			sortnode = castNode(Sort, aggnode->plan.lefttree);
3487 3488 3489 3490 3491 3492 3493
		}
		else
		{
			aggnode = node;
			sortnode = NULL;
		}

3494
		Assert(phase <= 1 || sortnode);
3495

3496 3497
		if (aggnode->aggstrategy == AGG_HASHED
			|| aggnode->aggstrategy == AGG_MIXED)
3498
		{
3499 3500 3501
			AggStatePerPhase phasedata = &aggstate->phases[0];
			AggStatePerHash perhash;
			Bitmapset  *cols = NULL;
3502

3503 3504 3505
			Assert(phase == 0);
			i = phasedata->numsets++;
			perhash = &aggstate->perhash[i];
3506

3507 3508 3509
			/* phase 0 always points to the "real" Agg in the hash case */
			phasedata->aggnode = node;
			phasedata->aggstrategy = node->aggstrategy;
3510

3511 3512 3513 3514 3515 3516 3517 3518 3519
			/* but the actual Agg node representing this hash is saved here */
			perhash->aggnode = aggnode;

			phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols;

			for (j = 0; j < aggnode->numCols; ++j)
				cols = bms_add_member(cols, aggnode->grpColIdx[j]);

			phasedata->grouped_cols[i] = cols;
3520

3521 3522
			all_grouped_cols = bms_add_members(all_grouped_cols, cols);
			continue;
3523
		}
3524
		else
3525
		{
3526 3527
			AggStatePerPhase phasedata = &aggstate->phases[++phase];
			int			num_sets;
3528

3529
			phasedata->numsets = num_sets = list_length(aggnode->groupingSets);
3530

3531 3532 3533 3534
			if (num_sets)
			{
				phasedata->gset_lengths = palloc(num_sets * sizeof(int));
				phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *));
3535

3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552
				i = 0;
				foreach(l, aggnode->groupingSets)
				{
					int			current_length = list_length(lfirst(l));
					Bitmapset  *cols = NULL;

					/* planner forces this to be correct */
					for (j = 0; j < current_length; ++j)
						cols = bms_add_member(cols, aggnode->grpColIdx[j]);

					phasedata->grouped_cols[i] = cols;
					phasedata->gset_lengths[i] = current_length;

					++i;
				}

				all_grouped_cols = bms_add_members(all_grouped_cols,
Tom Lane's avatar
Tom Lane committed
3553
												   phasedata->grouped_cols[0]);
3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567
			}
			else
			{
				Assert(phaseidx == 0);

				phasedata->gset_lengths = NULL;
				phasedata->grouped_cols = NULL;
			}

			/*
			 * If we are grouping, precompute fmgr lookup data for inner loop.
			 */
			if (aggnode->aggstrategy == AGG_SORTED)
			{
3568 3569
				int			i = 0;

3570 3571
				Assert(aggnode->numCols > 0);

3572 3573 3574 3575
				/*
				 * Build a separate function for each subset of columns that
				 * need to be compared.
				 */
3576
				phasedata->eqfunctions =
3577 3578 3579 3580 3581
					(ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *));

				/* for each grouping set */
				for (i = 0; i < phasedata->numsets; i++)
				{
3582
					int			length = phasedata->gset_lengths[i];
3583 3584 3585 3586 3587 3588 3589 3590 3591

					if (phasedata->eqfunctions[length - 1] != NULL)
						continue;

					phasedata->eqfunctions[length - 1] =
						execTuplesMatchPrepare(scanDesc,
											   length,
											   aggnode->grpColIdx,
											   aggnode->grpOperators,
3592
											   aggnode->grpCollations,
3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603
											   (PlanState *) aggstate);
				}

				/* and for all grouped columns, unless already computed */
				if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL)
				{
					phasedata->eqfunctions[aggnode->numCols - 1] =
						execTuplesMatchPrepare(scanDesc,
											   aggnode->numCols,
											   aggnode->grpColIdx,
											   aggnode->grpOperators,
3604
											   aggnode->grpCollations,
3605 3606
											   (PlanState *) aggstate);
				}
3607
			}
3608

3609 3610 3611 3612
			phasedata->aggnode = aggnode;
			phasedata->aggstrategy = aggnode->aggstrategy;
			phasedata->sortnode = sortnode;
		}
3613 3614
	}

3615 3616 3617 3618 3619 3620 3621
	/*
	 * Convert all_grouped_cols to a descending-order list.
	 */
	i = -1;
	while ((i = bms_next_member(all_grouped_cols, i)) >= 0)
		aggstate->all_grouped_cols = lcons_int(i, aggstate->all_grouped_cols);

3622
	/*
3623 3624
	 * Set up aggregate-result storage in the output expr context, and also
	 * allocate my private per-agg working storage
3625
	 */
3626
	econtext = aggstate->ss.ps.ps_ExprContext;
3627 3628
	econtext->ecxt_aggvalues = (Datum *) palloc0(sizeof(Datum) * numaggs);
	econtext->ecxt_aggnulls = (bool *) palloc0(sizeof(bool) * numaggs);
3629

3630
	peraggs = (AggStatePerAgg) palloc0(sizeof(AggStatePerAggData) * numaggs);
3631
	pertransstates = (AggStatePerTrans) palloc0(sizeof(AggStatePerTransData) * numtrans);
3632 3633 3634

	aggstate->peragg = peraggs;
	aggstate->pertrans = pertransstates;
3635

3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653

	aggstate->all_pergroups =
		(AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)
									 * (numGroupingSets + numHashes));
	pergroups = aggstate->all_pergroups;

	if (node->aggstrategy != AGG_HASHED)
	{
		for (i = 0; i < numGroupingSets; i++)
		{
			pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData)
													  * numaggs);
		}

		aggstate->pergroups = pergroups;
		pergroups += numGroupingSets;
	}

3654 3655 3656
	/*
	 * Hashing can only appear in the initial phase.
	 */
3657
	if (use_hashing)
3658
	{
3659 3660 3661
		Plan	   *outerplan = outerPlan(node);
		uint64		totalGroups = 0;
		int			i;
Jeff Davis's avatar
Jeff Davis committed
3662

3663 3664 3665
		aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt,
													   "HashAgg meta context",
													   ALLOCSET_DEFAULT_SIZES);
3666 3667 3668 3669
		aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc,
															&TTSOpsMinimalTuple);
		aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc,
															&TTSOpsVirtual);
Jeff Davis's avatar
Jeff Davis committed
3670

3671
		/* this is an array of pointers, not structures */
3672
		aggstate->hash_pergroup = pergroups;
3673

3674 3675 3676
		aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans,
													  outerplan->plan_width,
													  node->transitionSpace);
Jeff Davis's avatar
Jeff Davis committed
3677 3678 3679 3680 3681 3682 3683 3684

		/*
		 * Consider all of the grouping sets together when setting the limits
		 * and estimating the number of partitions. This can be inaccurate
		 * when there is more than one grouping set, but should still be
		 * reasonable.
		 */
		for (i = 0; i < aggstate->num_hashes; i++)
3685
			totalGroups += aggstate->perhash[i].aggnode->numGroups;
Jeff Davis's avatar
Jeff Davis committed
3686 3687 3688 3689 3690

		hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
							&aggstate->hash_mem_limit,
							&aggstate->hash_ngroups_limit,
							&aggstate->hash_planned_partitions);
3691
		find_hash_columns(aggstate);
3692 3693 3694 3695 3696

		/* Skip massive memory allocation if we are just doing EXPLAIN */
		if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
			build_hash_tables(aggstate);

3697
		aggstate->table_filled = false;
3698 3699 3700

		/* Initialize this to 1, meaning nothing spilled, yet */
		aggstate->hash_batches_used = 1;
3701
	}
3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721

	/*
	 * Initialize current phase-dependent values to initial phase. The initial
	 * phase is 1 (first sort pass) for all strategies that use sorting (if
	 * hashing is being done too, then phase 0 is processed last); but if only
	 * hashing is being done, then phase 0 is all there is.
	 */
	if (node->aggstrategy == AGG_HASHED)
	{
		aggstate->current_phase = 0;
		initialize_phase(aggstate, 0);
		select_current_set(aggstate, 0, true);
	}
	else
	{
		aggstate->current_phase = 1;
		initialize_phase(aggstate, 1);
		select_current_set(aggstate, 0, false);
	}

3722
	/*
3723
	 * Perform lookups of aggregate function info, and initialize the
3724
	 * unchanging fields of the per-agg and per-trans data.
3725
	 */
3726
	foreach(l, aggstate->aggs)
3727
	{
3728
		Aggref	   *aggref = lfirst(l);
3729 3730
		AggStatePerAgg peragg;
		AggStatePerTrans pertrans;
3731
		Oid			inputTypes[FUNC_MAX_ARGS];
Bruce Momjian's avatar
Bruce Momjian committed
3732
		int			numArguments;
3733
		int			numDirectArgs;
3734
		HeapTuple	aggTuple;
3735
		Form_pg_aggregate aggform;
3736
		AclResult	aclresult;
3737
		Oid			finalfn_oid;
3738
		Oid			serialfn_oid,
3739
					deserialfn_oid;
3740
		Oid			aggOwner;
3741 3742
		Expr	   *finalfnexpr;
		Oid			aggtranstype;
3743

3744 3745
		/* Planner should have assigned aggregate to correct level */
		Assert(aggref->agglevelsup == 0);
3746 3747
		/* ... and the split mode should match */
		Assert(aggref->aggsplit == aggstate->aggsplit);
3748

3749 3750 3751 3752
		peragg = &peraggs[aggref->aggno];

		/* Check if we initialized the state for this aggregate already. */
		if (peragg->aggref != NULL)
3753 3754
			continue;

3755
		peragg->aggref = aggref;
3756
		peragg->transno = aggref->aggtransno;
3757

3758
		/* Fetch the pg_aggregate row */
3759 3760
		aggTuple = SearchSysCache1(AGGFNOID,
								   ObjectIdGetDatum(aggref->aggfnoid));
3761
		if (!HeapTupleIsValid(aggTuple))
3762
			elog(ERROR, "cache lookup failed for aggregate %u",
3763
				 aggref->aggfnoid);
3764 3765
		aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);

3766 3767 3768 3769
		/* Check permission to call aggregate function */
		aclresult = pg_proc_aclcheck(aggref->aggfnoid, GetUserId(),
									 ACL_EXECUTE);
		if (aclresult != ACLCHECK_OK)
3770
			aclcheck_error(aclresult, OBJECT_AGGREGATE,
3771
						   get_func_name(aggref->aggfnoid));
3772
		InvokeFunctionExecuteHook(aggref->aggfnoid);
3773

3774 3775 3776 3777
		/* planner recorded transition state type in the Aggref itself */
		aggtranstype = aggref->aggtranstype;
		Assert(OidIsValid(aggtranstype));

3778
		/* Final function only required if we're finalizing the aggregates */
3779
		if (DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit))
3780
			peragg->finalfn_oid = finalfn_oid = InvalidOid;
3781 3782
		else
			peragg->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
3783

3784 3785 3786 3787
		serialfn_oid = InvalidOid;
		deserialfn_oid = InvalidOid;

		/*
3788 3789
		 * Check if serialization/deserialization is required.  We only do it
		 * for aggregates that have transtype INTERNAL.
3790
		 */
3791
		if (aggtranstype == INTERNALOID)
3792 3793
		{
			/*
3794 3795 3796
			 * The planner should only have generated a serialize agg node if
			 * every aggregate with an INTERNAL state has a serialization
			 * function.  Verify that.
3797
			 */
3798 3799 3800 3801
			if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit))
			{
				/* serialization only valid when not running finalfn */
				Assert(DO_AGGSPLIT_SKIPFINAL(aggstate->aggsplit));
3802

3803 3804
				if (!OidIsValid(aggform->aggserialfn))
					elog(ERROR, "serialfunc not provided for serialization aggregation");
3805
				serialfn_oid = aggform->aggserialfn;
3806 3807 3808 3809 3810 3811 3812
			}

			/* Likewise for deserialization functions */
			if (DO_AGGSPLIT_DESERIALIZE(aggstate->aggsplit))
			{
				/* deserialization only valid when combining states */
				Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit));
3813

3814 3815
				if (!OidIsValid(aggform->aggdeserialfn))
					elog(ERROR, "deserialfunc not provided for deserialization aggregation");
3816
				deserialfn_oid = aggform->aggdeserialfn;
3817
			}
3818 3819
		}

3820 3821 3822 3823
		/* Check that aggregate owner has permission to call component fns */
		{
			HeapTuple	procTuple;

3824 3825
			procTuple = SearchSysCache1(PROCOID,
										ObjectIdGetDatum(aggref->aggfnoid));
3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836
			if (!HeapTupleIsValid(procTuple))
				elog(ERROR, "cache lookup failed for function %u",
					 aggref->aggfnoid);
			aggOwner = ((Form_pg_proc) GETSTRUCT(procTuple))->proowner;
			ReleaseSysCache(procTuple);

			if (OidIsValid(finalfn_oid))
			{
				aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
											 ACL_EXECUTE);
				if (aclresult != ACLCHECK_OK)
3837
					aclcheck_error(aclresult, OBJECT_FUNCTION,
3838
								   get_func_name(finalfn_oid));
3839
				InvokeFunctionExecuteHook(finalfn_oid);
3840
			}
3841 3842 3843 3844 3845
			if (OidIsValid(serialfn_oid))
			{
				aclresult = pg_proc_aclcheck(serialfn_oid, aggOwner,
											 ACL_EXECUTE);
				if (aclresult != ACLCHECK_OK)
3846
					aclcheck_error(aclresult, OBJECT_FUNCTION,
3847 3848 3849 3850 3851 3852 3853 3854
								   get_func_name(serialfn_oid));
				InvokeFunctionExecuteHook(serialfn_oid);
			}
			if (OidIsValid(deserialfn_oid))
			{
				aclresult = pg_proc_aclcheck(deserialfn_oid, aggOwner,
											 ACL_EXECUTE);
				if (aclresult != ACLCHECK_OK)
3855
					aclcheck_error(aclresult, OBJECT_FUNCTION,
3856 3857 3858
								   get_func_name(deserialfn_oid));
				InvokeFunctionExecuteHook(deserialfn_oid);
			}
3859 3860
		}

3861
		/*
Bruce Momjian's avatar
Bruce Momjian committed
3862
		 * Get actual datatypes of the (nominal) aggregate inputs.  These
3863 3864 3865 3866 3867 3868 3869 3870
		 * could be different from the agg's declared input types, when the
		 * agg accepts ANY or a polymorphic type.
		 */
		numArguments = get_aggregate_argtypes(aggref, inputTypes);

		/* Count the "direct" arguments, if any */
		numDirectArgs = list_length(aggref->aggdirectargs);

3871 3872 3873 3874 3875
		/* Detect how many arguments to pass to the finalfn */
		if (aggform->aggfinalextra)
			peragg->numFinalArgs = numArguments + 1;
		else
			peragg->numFinalArgs = numDirectArgs + 1;
3876

3877 3878 3879 3880
		/* Initialize any direct-argument expressions */
		peragg->aggdirectargs = ExecInitExprList(aggref->aggdirectargs,
												 (PlanState *) aggstate);

3881 3882
		/*
		 * build expression trees using actual argument & result types for the
3883
		 * finalfn, if it exists and is required.
3884
		 */
3885 3886
		if (OidIsValid(finalfn_oid))
		{
3887 3888 3889 3890 3891 3892 3893 3894 3895
			build_aggregate_finalfn_expr(inputTypes,
										 peragg->numFinalArgs,
										 aggtranstype,
										 aggref->aggtype,
										 aggref->inputcollid,
										 finalfn_oid,
										 &finalfnexpr);
			fmgr_info(finalfn_oid, &peragg->finalfn);
			fmgr_info_set_expr((Node *) finalfnexpr, &peragg->finalfn);
3896 3897
		}

3898
		/* get info about the output value's datatype */
3899
		get_typlenbyval(aggref->aggtype,
3900 3901
						&peragg->resulttypeLen,
						&peragg->resulttypeByVal);
3902

3903
		/*
3904 3905
		 * Build working state for invoking the transition function, if we
		 * haven't done it already.
3906
		 */
3907 3908
		pertrans = &pertransstates[aggref->aggtransno];
		if (pertrans->aggref == NULL)
3909
		{
3910 3911 3912 3913 3914
			Datum		textInitVal;
			Datum		initValue;
			bool		initValueIsNull;
			Oid			transfn_oid;

3915
			/*
3916 3917 3918
			 * If this aggregation is performing state combines, then instead
			 * of using the transition function, we'll use the combine
			 * function
3919
			 */
3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949
			if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
			{
				transfn_oid = aggform->aggcombinefn;

				/* If not set then the planner messed up */
				if (!OidIsValid(transfn_oid))
					elog(ERROR, "combinefn not set for aggregate function");
			}
			else
				transfn_oid = aggform->aggtransfn;

			aclresult = pg_proc_aclcheck(transfn_oid, aggOwner,
										 ACL_EXECUTE);
			if (aclresult != ACLCHECK_OK)
				aclcheck_error(aclresult, OBJECT_FUNCTION,
							   get_func_name(transfn_oid));
			InvokeFunctionExecuteHook(transfn_oid);

			/*
			 * initval is potentially null, so don't try to access it as a
			 * struct field. Must do it the hard way with SysCacheGetAttr.
			 */
			textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
										  Anum_pg_aggregate_agginitval,
										  &initValueIsNull);
			if (initValueIsNull)
				initValue = (Datum) 0;
			else
				initValue = GetAggInitVal(textInitVal, aggtranstype);

3950 3951
			build_pertrans_for_aggref(pertrans, aggstate, estate,
									  aggref, transfn_oid, aggtranstype,
3952 3953 3954
									  serialfn_oid, deserialfn_oid,
									  initValue, initValueIsNull,
									  inputTypes, numArguments);
3955
		}
3956 3957
		else
			pertrans->aggshared = true;
3958 3959
		ReleaseSysCache(aggTuple);
	}
3960

3961
	/*
3962 3963
	 * Update aggstate->numaggs to be the number of unique aggregates found.
	 * Also set numstates to the number of unique transition states found.
3964
	 */
3965 3966
	aggstate->numaggs = numaggs;
	aggstate->numtrans = numtrans;
3967

3968
	/*
3969 3970 3971 3972 3973 3974 3975 3976
	 * Last, check whether any more aggregates got added onto the node while
	 * we processed the expressions for the aggregate arguments (including not
	 * only the regular arguments and FILTER expressions handled immediately
	 * above, but any direct arguments we might've handled earlier).  If so,
	 * we have nested aggregate functions, which is semantically nonsensical,
	 * so complain.  (This should have been caught by the parser, so we don't
	 * need to work hard on a helpful error message; but we defend against it
	 * here anyway, just to be sure.)
3977
	 */
3978
	if (numaggrefs != list_length(aggstate->aggs))
3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990
		ereport(ERROR,
				(errcode(ERRCODE_GROUPING_ERROR),
				 errmsg("aggregate function calls cannot be nested")));

	/*
	 * Build expressions doing all the transition work at once. We build a
	 * different one for each phase, as the number of transition function
	 * invocation can differ between phases. Note this'll work both for
	 * transition and combination functions (although there'll only be one
	 * phase in the latter case).
	 */
	for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++)
3991
	{
3992 3993 3994
		AggStatePerPhase phase = &aggstate->phases[phaseidx];
		bool		dohash = false;
		bool		dosort = false;
3995

3996 3997 3998
		/* phase 0 doesn't necessarily exist */
		if (!phase->aggnode)
			continue;
3999

4000
		if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1)
4001
		{
4002
			/*
4003 4004
			 * Phase one, and only phase one, in a mixed agg performs both
			 * sorting and aggregation.
4005
			 */
4006 4007
			dohash = true;
			dosort = true;
4008
		}
4009
		else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0)
4010 4011
		{
			/*
4012 4013 4014
			 * No need to compute a transition function for an AGG_MIXED phase
			 * 0 - the contents of the hashtables will have been computed
			 * during phase 1.
4015
			 */
4016
			continue;
4017
		}
4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030
		else if (phase->aggstrategy == AGG_PLAIN ||
				 phase->aggstrategy == AGG_SORTED)
		{
			dohash = false;
			dosort = true;
		}
		else if (phase->aggstrategy == AGG_HASHED)
		{
			dohash = true;
			dosort = false;
		}
		else
			Assert(false);
4031

4032 4033
		phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
											 false);
4034

Jeff Davis's avatar
Jeff Davis committed
4035 4036
		/* cache compiled expression for outer slot without NULL check */
		phase->evaltrans_cache[0][0] = phase->evaltrans;
4037
	}
4038

4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053
	return aggstate;
}

/*
 * Build the state needed to calculate a state value for an aggregate.
 *
 * This initializes all the fields in 'pertrans'. 'aggref' is the aggregate
 * to initialize the state for. 'aggtransfn', 'aggtranstype', and the rest
 * of the arguments could be calculated from 'aggref', but the caller has
 * calculated them already, so might as well pass them.
 */
static void
build_pertrans_for_aggref(AggStatePerTrans pertrans,
						  AggState *aggstate, EState *estate,
						  Aggref *aggref,
4054
						  Oid aggtransfn, Oid aggtranstype,
4055
						  Oid aggserialfn, Oid aggdeserialfn,
4056 4057 4058 4059
						  Datum initValue, bool initValueIsNull,
						  Oid *inputTypes, int numArguments)
{
	int			numGroupingSets = Max(aggstate->maxsets, 1);
4060 4061
	Expr	   *serialfnexpr = NULL;
	Expr	   *deserialfnexpr = NULL;
4062 4063 4064 4065 4066 4067 4068 4069 4070 4071
	ListCell   *lc;
	int			numInputs;
	int			numDirectArgs;
	List	   *sortlist;
	int			numSortCols;
	int			numDistinctCols;
	int			i;

	/* Begin filling in the pertrans data */
	pertrans->aggref = aggref;
4072
	pertrans->aggshared = false;
4073 4074
	pertrans->aggCollation = aggref->inputcollid;
	pertrans->transfn_oid = aggtransfn;
4075 4076
	pertrans->serialfn_oid = aggserialfn;
	pertrans->deserialfn_oid = aggdeserialfn;
4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088
	pertrans->initValue = initValue;
	pertrans->initValueIsNull = initValueIsNull;

	/* Count the "direct" arguments, if any */
	numDirectArgs = list_length(aggref->aggdirectargs);

	/* Count the number of aggregated input columns */
	pertrans->numInputs = numInputs = list_length(aggref->args);

	pertrans->aggtranstype = aggtranstype;

	/*
4089 4090 4091 4092
	 * When combining states, we have no use at all for the aggregate
	 * function's transfn. Instead we use the combinefn.  In this case, the
	 * transfn and transfn_oid fields of pertrans refer to the combine
	 * function rather than the transition function.
4093
	 */
4094
	if (DO_AGGSPLIT_COMBINE(aggstate->aggsplit))
4095
	{
4096
		Expr	   *combinefnexpr;
4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107
		size_t		numTransArgs;

		/*
		 * When combining there's only one input, the to-be-combined added
		 * transition value from below (this node's transition value is
		 * counted separately).
		 */
		pertrans->numTransInputs = 1;

		/* account for the current transition state */
		numTransArgs = pertrans->numTransInputs + 1;
4108 4109 4110 4111 4112 4113 4114 4115

		build_aggregate_combinefn_expr(aggtranstype,
									   aggref->inputcollid,
									   aggtransfn,
									   &combinefnexpr);
		fmgr_info(aggtransfn, &pertrans->transfn);
		fmgr_info_set_expr((Node *) combinefnexpr, &pertrans->transfn);

4116 4117 4118
		pertrans->transfn_fcinfo =
			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
		InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
4119
								 &pertrans->transfn,
4120
								 numTransArgs,
4121 4122
								 pertrans->aggCollation,
								 (void *) aggstate, NULL);
4123 4124 4125 4126 4127 4128 4129 4130 4131

		/*
		 * Ensure that a combine function to combine INTERNAL states is not
		 * strict. This should have been checked during CREATE AGGREGATE, but
		 * the strict property could have been changed since then.
		 */
		if (pertrans->transfn.fn_strict && aggtranstype == INTERNALOID)
			ereport(ERROR,
					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
4132 4133
					 errmsg("combine function with transition type %s must not be declared STRICT",
							format_type_be(aggtranstype))));
4134 4135 4136 4137
	}
	else
	{
		Expr	   *transfnexpr;
4138 4139 4140 4141 4142 4143 4144 4145 4146 4147
		size_t		numTransArgs;

		/* Detect how many arguments to pass to the transfn */
		if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
			pertrans->numTransInputs = numInputs;
		else
			pertrans->numTransInputs = numArguments;

		/* account for the current transition state */
		numTransArgs = pertrans->numTransInputs + 1;
4148 4149

		/*
4150 4151
		 * Set up infrastructure for calling the transfn.  Note that
		 * invtransfn is not needed here.
4152 4153 4154 4155 4156 4157 4158 4159
		 */
		build_aggregate_transfn_expr(inputTypes,
									 numArguments,
									 numDirectArgs,
									 aggref->aggvariadic,
									 aggtranstype,
									 aggref->inputcollid,
									 aggtransfn,
4160
									 InvalidOid,
4161 4162 4163 4164 4165
									 &transfnexpr,
									 NULL);
		fmgr_info(aggtransfn, &pertrans->transfn);
		fmgr_info_set_expr((Node *) transfnexpr, &pertrans->transfn);

4166
		pertrans->transfn_fcinfo =
4167
			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(numTransArgs));
4168
		InitFunctionCallInfoData(*pertrans->transfn_fcinfo,
4169
								 &pertrans->transfn,
4170
								 numTransArgs,
4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191
								 pertrans->aggCollation,
								 (void *) aggstate, NULL);

		/*
		 * If the transfn is strict and the initval is NULL, make sure input
		 * type and transtype are the same (or at least binary-compatible), so
		 * that it's OK to use the first aggregated input value as the initial
		 * transValue.  This should have been checked at agg definition time,
		 * but we must check again in case the transfn's strictness property
		 * has been changed.
		 */
		if (pertrans->transfn.fn_strict && pertrans->initValueIsNull)
		{
			if (numArguments <= numDirectArgs ||
				!IsBinaryCoercible(inputTypes[numDirectArgs],
								   aggtranstype))
				ereport(ERROR,
						(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
						 errmsg("aggregate %u needs to have compatible input type and transition type",
								aggref->aggfnoid)));
		}
4192 4193 4194 4195 4196 4197 4198
	}

	/* get info about the state value's datatype */
	get_typlenbyval(aggtranstype,
					&pertrans->transtypeLen,
					&pertrans->transtypeByVal);

4199 4200
	if (OidIsValid(aggserialfn))
	{
4201
		build_aggregate_serialfn_expr(aggserialfn,
4202 4203 4204 4205
									  &serialfnexpr);
		fmgr_info(aggserialfn, &pertrans->serialfn);
		fmgr_info_set_expr((Node *) serialfnexpr, &pertrans->serialfn);

4206 4207 4208
		pertrans->serialfn_fcinfo =
			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(1));
		InitFunctionCallInfoData(*pertrans->serialfn_fcinfo,
4209 4210
								 &pertrans->serialfn,
								 1,
4211
								 InvalidOid,
4212 4213 4214 4215 4216
								 (void *) aggstate, NULL);
	}

	if (OidIsValid(aggdeserialfn))
	{
4217 4218
		build_aggregate_deserialfn_expr(aggdeserialfn,
										&deserialfnexpr);
4219 4220 4221
		fmgr_info(aggdeserialfn, &pertrans->deserialfn);
		fmgr_info_set_expr((Node *) deserialfnexpr, &pertrans->deserialfn);

4222 4223 4224
		pertrans->deserialfn_fcinfo =
			(FunctionCallInfo) palloc(SizeForFunctionCallInfo(2));
		InitFunctionCallInfoData(*pertrans->deserialfn_fcinfo,
4225
								 &pertrans->deserialfn,
4226 4227
								 2,
								 InvalidOid,
4228 4229 4230 4231
								 (void *) aggstate, NULL);

	}

4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255 4256 4257
	/*
	 * If we're doing either DISTINCT or ORDER BY for a plain agg, then we
	 * have a list of SortGroupClause nodes; fish out the data in them and
	 * stick them into arrays.  We ignore ORDER BY for an ordered-set agg,
	 * however; the agg's transfn and finalfn are responsible for that.
	 *
	 * Note that by construction, if there is a DISTINCT clause then the ORDER
	 * BY clause is a prefix of it (see transformDistinctClause).
	 */
	if (AGGKIND_IS_ORDERED_SET(aggref->aggkind))
	{
		sortlist = NIL;
		numSortCols = numDistinctCols = 0;
	}
	else if (aggref->aggdistinct)
	{
		sortlist = aggref->aggdistinct;
		numSortCols = numDistinctCols = list_length(sortlist);
		Assert(numSortCols >= list_length(aggref->aggorder));
	}
	else
	{
		sortlist = aggref->aggorder;
		numSortCols = list_length(sortlist);
		numDistinctCols = 0;
	}
4258

4259 4260
	pertrans->numSortCols = numSortCols;
	pertrans->numDistinctCols = numDistinctCols;
4261

4262 4263 4264 4265 4266 4267
	/*
	 * If we have either sorting or filtering to do, create a tupledesc and
	 * slot corresponding to the aggregated inputs (including sort
	 * expressions) of the agg.
	 */
	if (numSortCols > 0 || aggref->aggfilter)
4268
	{
4269
		pertrans->sortdesc = ExecTypeFromTL(aggref->args);
4270
		pertrans->sortslot =
4271 4272
			ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
								   &TTSOpsMinimalTuple);
4273
	}
4274

4275 4276
	if (numSortCols > 0)
	{
4277
		/*
4278 4279
		 * We don't implement DISTINCT or ORDER BY aggs in the HASHED case
		 * (yet)
4280
		 */
4281
		Assert(aggstate->aggstrategy != AGG_HASHED && aggstate->aggstrategy != AGG_MIXED);
4282 4283 4284

		/* If we have only one input, we need its len/byval info. */
		if (numInputs == 1)
4285
		{
4286 4287 4288
			get_typlenbyval(inputTypes[numDirectArgs],
							&pertrans->inputtypeLen,
							&pertrans->inputtypeByVal);
4289
		}
4290
		else if (numDistinctCols > 0)
4291
		{
4292
			/* we will need an extra slot to store prior values */
4293
			pertrans->uniqslot =
4294 4295
				ExecInitExtraTupleSlot(estate, pertrans->sortdesc,
									   &TTSOpsMinimalTuple);
4296
		}
4297

4298 4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309
		/* Extract the sort information for use later */
		pertrans->sortColIdx =
			(AttrNumber *) palloc(numSortCols * sizeof(AttrNumber));
		pertrans->sortOperators =
			(Oid *) palloc(numSortCols * sizeof(Oid));
		pertrans->sortCollations =
			(Oid *) palloc(numSortCols * sizeof(Oid));
		pertrans->sortNullsFirst =
			(bool *) palloc(numSortCols * sizeof(bool));

		i = 0;
		foreach(lc, sortlist)
4310
		{
4311 4312
			SortGroupClause *sortcl = (SortGroupClause *) lfirst(lc);
			TargetEntry *tle = get_sortgroupclause_tle(sortcl, aggref->args);
4313

4314 4315
			/* the parser should have made sure of this */
			Assert(OidIsValid(sortcl->sortop));
4316

4317 4318 4319 4320 4321
			pertrans->sortColIdx[i] = tle->resno;
			pertrans->sortOperators[i] = sortcl->sortop;
			pertrans->sortCollations[i] = exprCollation((Node *) tle->expr);
			pertrans->sortNullsFirst[i] = sortcl->nulls_first;
			i++;
4322
		}
4323 4324
		Assert(i == numSortCols);
	}
4325

4326 4327
	if (aggref->aggdistinct)
	{
4328 4329
		Oid		   *ops;

4330
		Assert(numArguments > 0);
4331
		Assert(list_length(aggref->aggdistinct) == numDistinctCols);
4332

4333
		ops = palloc(numDistinctCols * sizeof(Oid));
4334

4335 4336
		i = 0;
		foreach(lc, aggref->aggdistinct)
4337
			ops[i++] = ((SortGroupClause *) lfirst(lc))->eqop;
4338

4339 4340 4341 4342 4343 4344 4345 4346 4347
		/* lookup / build the necessary comparators */
		if (numDistinctCols == 1)
			fmgr_info(get_opcode(ops[0]), &pertrans->equalfnOne);
		else
			pertrans->equalfnMulti =
				execTuplesMatchPrepare(pertrans->sortdesc,
									   numDistinctCols,
									   pertrans->sortColIdx,
									   ops,
4348
									   pertrans->sortCollations,
4349 4350
									   &aggstate->ss.ps);
		pfree(ops);
4351 4352
	}

4353 4354
	pertrans->sortstates = (Tuplesortstate **)
		palloc0(sizeof(Tuplesortstate *) * numGroupingSets);
4355 4356
}

4357

4358 4359 4360 4361
static Datum
GetAggInitVal(Datum textInitVal, Oid transtype)
{
	Oid			typinput,
4362 4363
				typioparam;
	char	   *strInitVal;
4364 4365
	Datum		initVal;

4366
	getTypeInputInfo(transtype, &typinput, &typioparam);
4367
	strInitVal = TextDatumGetCString(textInitVal);
4368 4369
	initVal = OidInputFunctionCall(typinput, strInitVal,
								   typioparam, -1);
4370 4371 4372 4373
	pfree(strInitVal);
	return initVal;
}

4374
void
4375
ExecEndAgg(AggState *node)
4376
{
4377
	PlanState  *outerPlan;
4378
	int			transno;
4379 4380
	int			numGroupingSets = Max(node->maxsets, 1);
	int			setno;
4381

4382 4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393 4394 4395 4396 4397
	/*
	 * When ending a parallel worker, copy the statistics gathered by the
	 * worker back into shared memory so that it can be picked up by the main
	 * process to report in EXPLAIN ANALYZE.
	 */
	if (node->shared_info && IsParallelWorker())
	{
		AggregateInstrumentation *si;

		Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
		si = &node->shared_info->sinstrument[ParallelWorkerNumber];
		si->hash_batches_used = node->hash_batches_used;
		si->hash_disk_used = node->hash_disk_used;
		si->hash_mem_peak = node->hash_mem_peak;
	}

4398
	/* Make sure we have closed any open tuplesorts */
4399 4400 4401 4402 4403 4404

	if (node->sort_in)
		tuplesort_end(node->sort_in);
	if (node->sort_out)
		tuplesort_end(node->sort_out);

Jeff Davis's avatar
Jeff Davis committed
4405 4406 4407 4408 4409 4410 4411 4412
	hashagg_reset_spill_state(node);

	if (node->hash_metacxt != NULL)
	{
		MemoryContextDelete(node->hash_metacxt);
		node->hash_metacxt = NULL;
	}

4413
	for (transno = 0; transno < node->numtrans; transno++)
4414
	{
4415
		AggStatePerTrans pertrans = &node->pertrans[transno];
4416

4417 4418
		for (setno = 0; setno < numGroupingSets; setno++)
		{
4419 4420
			if (pertrans->sortstates[setno])
				tuplesort_end(pertrans->sortstates[setno]);
4421
		}
4422
	}
4423

4424
	/* And ensure any agg shutdown callbacks have been called */
4425 4426
	for (setno = 0; setno < numGroupingSets; setno++)
		ReScanExprContext(node->aggcontexts[setno]);
4427 4428
	if (node->hashcontext)
		ReScanExprContext(node->hashcontext);
4429

4430
	/*
4431 4432 4433
	 * We don't actually free any ExprContexts here (see comment in
	 * ExecFreeExprContext), just unlinking the output one from the plan node
	 * suffices.
4434
	 */
4435
	ExecFreeExprContext(&node->ss.ps);
4436

4437 4438 4439
	/* clean up tuple table */
	ExecClearTuple(node->ss.ss_ScanTupleSlot);

4440 4441
	outerPlan = outerPlanState(node);
	ExecEndNode(outerPlan);
4442 4443
}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
4444
void
4445
ExecReScanAgg(AggState *node)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
4446
{
4447
	ExprContext *econtext = node->ss.ps.ps_ExprContext;
Bruce Momjian's avatar
Bruce Momjian committed
4448
	PlanState  *outerPlan = outerPlanState(node);
4449
	Agg		   *aggnode = (Agg *) node->ss.ps.plan;
4450
	int			transno;
Bruce Momjian's avatar
Bruce Momjian committed
4451 4452
	int			numGroupingSets = Max(node->maxsets, 1);
	int			setno;
4453

4454 4455
	node->agg_done = false;

4456
	if (node->aggstrategy == AGG_HASHED)
4457 4458
	{
		/*
4459 4460 4461 4462
		 * In the hashed case, if we haven't yet built the hash table then we
		 * can just return; nothing done yet, so nothing to undo. If subnode's
		 * chgParam is not NULL then it will be re-scanned by ExecProcNode,
		 * else no reason to re-scan it at all.
4463 4464 4465 4466 4467
		 */
		if (!node->table_filled)
			return;

		/*
Jeff Davis's avatar
Jeff Davis committed
4468 4469 4470 4471 4472
		 * If we do have the hash table, and it never spilled, and the subplan
		 * does not have any parameter changes, and none of our own parameter
		 * changes affect input expressions of the aggregated functions, then
		 * we can just rescan the existing hash table; no need to build it
		 * again.
4473
		 */
Jeff Davis's avatar
Jeff Davis committed
4474
		if (outerPlan->chgParam == NULL && !node->hash_ever_spilled &&
4475
			!bms_overlap(node->ss.ps.chgParam, aggnode->aggParams))
4476
		{
4477 4478 4479
			ResetTupleHashIterator(node->perhash[0].hashtable,
								   &node->perhash[0].hashiter);
			select_current_set(node, 0, true);
4480 4481 4482 4483
			return;
		}
	}

4484
	/* Make sure we have closed any open tuplesorts */
4485
	for (transno = 0; transno < node->numtrans; transno++)
4486
	{
4487 4488
		for (setno = 0; setno < numGroupingSets; setno++)
		{
4489
			AggStatePerTrans pertrans = &node->pertrans[transno];
4490

4491
			if (pertrans->sortstates[setno])
4492
			{
4493 4494
				tuplesort_end(pertrans->sortstates[setno]);
				pertrans->sortstates[setno] = NULL;
4495 4496
			}
		}
4497
	}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
4498

4499 4500 4501 4502 4503
	/*
	 * We don't need to ReScanExprContext the output tuple context here;
	 * ExecReScan already did it. But we do need to reset our per-grouping-set
	 * contexts, which may have transvalues stored in them. (We use rescan
	 * rather than just reset because transfns may have registered callbacks
4504
	 * that need to be run now.) For the AGG_HASHED case, see below.
4505 4506 4507 4508 4509 4510
	 */

	for (setno = 0; setno < numGroupingSets; setno++)
	{
		ReScanExprContext(node->aggcontexts[setno]);
	}
4511

4512
	/* Release first tuple of group, if we have made a copy */
4513
	if (node->grp_firstTuple != NULL)
4514
	{
4515 4516
		heap_freetuple(node->grp_firstTuple);
		node->grp_firstTuple = NULL;
4517
	}
4518
	ExecClearTuple(node->ss.ss_ScanTupleSlot);
4519 4520

	/* Forget current agg values */
4521 4522
	MemSet(econtext->ecxt_aggvalues, 0, sizeof(Datum) * node->numaggs);
	MemSet(econtext->ecxt_aggnulls, 0, sizeof(bool) * node->numaggs);
4523

4524 4525 4526 4527 4528 4529
	/*
	 * With AGG_HASHED/MIXED, the hash table is allocated in a sub-context of
	 * the hashcontext. This used to be an issue, but now, resetting a context
	 * automatically deletes sub-contexts too.
	 */
	if (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED)
4530
	{
Jeff Davis's avatar
Jeff Davis committed
4531 4532 4533 4534 4535 4536
		hashagg_reset_spill_state(node);

		node->hash_ever_spilled = false;
		node->hash_spill_mode = false;
		node->hash_ngroups_current = 0;

4537
		ReScanExprContext(node->hashcontext);
4538
		/* Rebuild an empty hash table */
Jeff Davis's avatar
Jeff Davis committed
4539
		build_hash_tables(node);
4540
		node->table_filled = false;
4541
		/* iterator will be reset when the table is filled */
Jeff Davis's avatar
Jeff Davis committed
4542 4543

		hashagg_recompile_expressions(node, false, false);
4544
	}
4545 4546

	if (node->aggstrategy != AGG_HASHED)
4547
	{
Bruce Momjian's avatar
Bruce Momjian committed
4548
		/*
4549
		 * Reset the per-group state (in particular, mark transvalues null)
Bruce Momjian's avatar
Bruce Momjian committed
4550
		 */
4551 4552 4553 4554 4555
		for (setno = 0; setno < numGroupingSets; setno++)
		{
			MemSet(node->pergroups[setno], 0,
				   sizeof(AggStatePerGroupData) * node->numaggs);
		}
4556

4557 4558
		/* reset to phase 1 */
		initialize_phase(node, 1);
4559 4560 4561

		node->input_done = false;
		node->projected_set = -1;
4562
	}
4563

4564 4565
	if (outerPlan->chgParam == NULL)
		ExecReScan(outerPlan);
4566
}
4567

4568 4569 4570 4571 4572 4573

/***********************************************************************
 * API exposed to aggregate functions
 ***********************************************************************/


4574 4575 4576 4577 4578
/*
 * AggCheckCallContext - test if a SQL function is being called as an aggregate
 *
 * The transition and/or final functions of an aggregate may want to verify
 * that they are being called as aggregates, rather than as plain SQL
Bruce Momjian's avatar
Bruce Momjian committed
4579 4580
 * functions.  They should use this function to do so.  The return value
 * is nonzero if being called as an aggregate, or zero if not.  (Specific
4581 4582 4583 4584
 * nonzero values are AGG_CONTEXT_AGGREGATE or AGG_CONTEXT_WINDOW, but more
 * values could conceivably appear in future.)
 *
 * If aggcontext isn't NULL, the function also stores at *aggcontext the
4585 4586 4587 4588 4589
 * identity of the memory context that aggregate transition values are being
 * stored in.  Note that the same aggregate call site (flinfo) may be called
 * interleaved on different transition values in different contexts, so it's
 * not kosher to cache aggcontext under fn_extra.  It is, however, kosher to
 * cache it in the transvalue itself (for internal-type transvalues).
4590 4591 4592 4593 4594 4595 4596
 */
int
AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
{
	if (fcinfo->context && IsA(fcinfo->context, AggState))
	{
		if (aggcontext)
4597
		{
Bruce Momjian's avatar
Bruce Momjian committed
4598
			AggState   *aggstate = ((AggState *) fcinfo->context);
4599
			ExprContext *cxt = aggstate->curaggcontext;
Bruce Momjian's avatar
Bruce Momjian committed
4600

4601 4602
			*aggcontext = cxt->ecxt_per_tuple_memory;
		}
4603 4604 4605 4606 4607
		return AGG_CONTEXT_AGGREGATE;
	}
	if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
	{
		if (aggcontext)
4608
			*aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
4609 4610 4611 4612 4613 4614 4615 4616 4617
		return AGG_CONTEXT_WINDOW;
	}

	/* this is just to prevent "uninitialized variable" warnings */
	if (aggcontext)
		*aggcontext = NULL;
	return 0;
}

4618 4619 4620 4621 4622 4623
/*
 * AggGetAggref - allow an aggregate support function to get its Aggref
 *
 * If the function is being called as an aggregate support function,
 * return the Aggref node for the aggregate call.  Otherwise, return NULL.
 *
4624 4625 4626 4627 4628 4629 4630
 * Aggregates sharing the same inputs and transition functions can get
 * merged into a single transition calculation.  If the transition function
 * calls AggGetAggref, it will get some one of the Aggrefs for which it is
 * executing.  It must therefore not pay attention to the Aggref fields that
 * relate to the final function, as those are indeterminate.  But if a final
 * function calls AggGetAggref, it will get a precise result.
 *
4631 4632 4633 4634 4635 4636 4637 4638 4639
 * Note that if an aggregate is being used as a window function, this will
 * return NULL.  We could provide a similar function to return the relevant
 * WindowFunc node in such cases, but it's not needed yet.
 */
Aggref *
AggGetAggref(FunctionCallInfo fcinfo)
{
	if (fcinfo->context && IsA(fcinfo->context, AggState))
	{
4640
		AggState   *aggstate = (AggState *) fcinfo->context;
4641
		AggStatePerAgg curperagg;
4642 4643
		AggStatePerTrans curpertrans;

4644
		/* check curperagg (valid when in a final function) */
4645
		curperagg = aggstate->curperagg;
4646 4647 4648 4649 4650

		if (curperagg)
			return curperagg->aggref;

		/* check curpertrans (valid when in a transition function) */
4651
		curpertrans = aggstate->curpertrans;
4652

4653 4654
		if (curpertrans)
			return curpertrans->aggref;
4655 4656 4657 4658 4659
	}
	return NULL;
}

/*
4660
 * AggGetTempMemoryContext - fetch short-term memory context for aggregates
4661
 *
4662 4663 4664 4665
 * This is useful in agg final functions; the context returned is one that
 * the final function can safely reset as desired.  This isn't useful for
 * transition functions, since the context returned MAY (we don't promise)
 * be the same as the context those are called in.
4666 4667 4668
 *
 * As above, this is currently not useful for aggs called as window functions.
 */
4669 4670
MemoryContext
AggGetTempMemoryContext(FunctionCallInfo fcinfo)
4671 4672 4673 4674 4675
{
	if (fcinfo->context && IsA(fcinfo->context, AggState))
	{
		AggState   *aggstate = (AggState *) fcinfo->context;

4676
		return aggstate->tmpcontext->ecxt_per_tuple_memory;
4677 4678 4679 4680
	}
	return NULL;
}

4681 4682 4683 4684
/*
 * AggStateIsShared - find out whether transition state is shared
 *
 * If the function is being called as an aggregate support function,
4685 4686
 * return true if the aggregate's transition state is shared across
 * multiple aggregates, false if it is not.
4687
 *
4688
 * Returns true if not called as an aggregate support function.
4689
 * This is intended as a conservative answer, ie "no you'd better not
4690
 * scribble on your input".  In particular, will return true if the
4691 4692 4693 4694 4695 4696 4697 4698 4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712 4713 4714 4715 4716 4717 4718
 * aggregate is being used as a window function, which is a scenario
 * in which changing the transition state is a bad idea.  We might
 * want to refine the behavior for the window case in future.
 */
bool
AggStateIsShared(FunctionCallInfo fcinfo)
{
	if (fcinfo->context && IsA(fcinfo->context, AggState))
	{
		AggState   *aggstate = (AggState *) fcinfo->context;
		AggStatePerAgg curperagg;
		AggStatePerTrans curpertrans;

		/* check curperagg (valid when in a final function) */
		curperagg = aggstate->curperagg;

		if (curperagg)
			return aggstate->pertrans[curperagg->transno].aggshared;

		/* check curpertrans (valid when in a transition function) */
		curpertrans = aggstate->curpertrans;

		if (curpertrans)
			return curpertrans->aggshared;
	}
	return true;
}

4719
/*
4720
 * AggRegisterCallback - register a cleanup callback for an aggregate
4721 4722
 *
 * This is useful for aggs to register shutdown callbacks, which will ensure
4723 4724 4725 4726 4727 4728 4729 4730
 * that non-memory resources are freed.  The callback will occur just before
 * the associated aggcontext (as returned by AggCheckCallContext) is reset,
 * either between groups or as a result of rescanning the query.  The callback
 * will NOT be called on error paths.  The typical use-case is for freeing of
 * tuplestores or tuplesorts maintained in aggcontext, or pins held by slots
 * created by the agg functions.  (The callback will not be called until after
 * the result of the finalfn is no longer needed, so it's safe for the finalfn
 * to return data that will be freed by the callback.)
4731 4732 4733
 *
 * As above, this is currently not useful for aggs called as window functions.
 */
4734 4735 4736 4737
void
AggRegisterCallback(FunctionCallInfo fcinfo,
					ExprContextCallbackFunction func,
					Datum arg)
4738 4739 4740 4741
{
	if (fcinfo->context && IsA(fcinfo->context, AggState))
	{
		AggState   *aggstate = (AggState *) fcinfo->context;
4742
		ExprContext *cxt = aggstate->curaggcontext;
4743

4744
		RegisterExprContextCallback(cxt, func, arg);
4745 4746

		return;
4747
	}
4748
	elog(ERROR, "aggregate function cannot register a callback in this context");
4749 4750 4751
}


4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763 4764 4765 4766 4767 4768 4769 4770 4771 4772 4773 4774 4775 4776 4777 4778 4779 4780 4781 4782 4783 4784 4785 4786 4787 4788 4789 4790 4791 4792 4793 4794 4795 4796 4797 4798 4799 4800 4801 4802 4803 4804 4805 4806 4807 4808 4809 4810 4811 4812 4813 4814 4815 4816 4817 4818 4819 4820 4821 4822 4823 4824 4825 4826 4827 4828 4829 4830 4831 4832 4833 4834 4835 4836
/* ----------------------------------------------------------------
 *						Parallel Query Support
 * ----------------------------------------------------------------
 */

 /* ----------------------------------------------------------------
  *		ExecAggEstimate
  *
  *		Estimate space required to propagate aggregate statistics.
  * ----------------------------------------------------------------
  */
void
ExecAggEstimate(AggState *node, ParallelContext *pcxt)
{
	Size		size;

	/* don't need this if not instrumenting or no workers */
	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
		return;

	size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
	size = add_size(size, offsetof(SharedAggInfo, sinstrument));
	shm_toc_estimate_chunk(&pcxt->estimator, size);
	shm_toc_estimate_keys(&pcxt->estimator, 1);
}

/* ----------------------------------------------------------------
 *		ExecAggInitializeDSM
 *
 *		Initialize DSM space for aggregate statistics.
 * ----------------------------------------------------------------
 */
void
ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
{
	Size		size;

	/* don't need this if not instrumenting or no workers */
	if (!node->ss.ps.instrument || pcxt->nworkers == 0)
		return;

	size = offsetof(SharedAggInfo, sinstrument)
		+ pcxt->nworkers * sizeof(AggregateInstrumentation);
	node->shared_info = shm_toc_allocate(pcxt->toc, size);
	/* ensure any unfilled slots will contain zeroes */
	memset(node->shared_info, 0, size);
	node->shared_info->num_workers = pcxt->nworkers;
	shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
				   node->shared_info);
}

/* ----------------------------------------------------------------
 *		ExecAggInitializeWorker
 *
 *		Attach worker to DSM space for aggregate statistics.
 * ----------------------------------------------------------------
 */
void
ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
{
	node->shared_info =
		shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
}

/* ----------------------------------------------------------------
 *		ExecAggRetrieveInstrumentation
 *
 *		Transfer aggregate statistics from DSM to private memory.
 * ----------------------------------------------------------------
 */
void
ExecAggRetrieveInstrumentation(AggState *node)
{
	Size		size;
	SharedAggInfo *si;

	if (node->shared_info == NULL)
		return;

	size = offsetof(SharedAggInfo, sinstrument)
		+ node->shared_info->num_workers * sizeof(AggregateInstrumentation);
	si = palloc(size);
	memcpy(si, node->shared_info, size);
	node->shared_info = si;
}