Commit 9878b643 authored by Jeff Davis's avatar Jeff Davis

HashAgg: use better cardinality estimate for recursive spilling.

Use HyperLogLog to estimate the group cardinality in a spilled
partition. This estimate is used to choose the number of partitions if
we recurse.

The previous behavior was to use the number of tuples in a spilled
partition as the estimate for the number of groups, which lead to
overpartitioning. That could cause the number of batches to be much
higher than expected (with each batch being very small), which made it
harder to interpret EXPLAIN ANALYZE results.

Reviewed-by: Peter Geoghegan
Discussion: https://postgr.es/m/a856635f9284bc36f7a77d02f47bbb6aaf7b59b3.camel@j-davis.com
Backpatch-through: 13
parent f2130e77
...@@ -245,9 +245,11 @@ ...@@ -245,9 +245,11 @@
#include "catalog/pg_aggregate.h" #include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "common/hashfn.h"
#include "executor/execExpr.h" #include "executor/execExpr.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "executor/nodeAgg.h" #include "executor/nodeAgg.h"
#include "lib/hyperloglog.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
...@@ -295,6 +297,14 @@ ...@@ -295,6 +297,14 @@
#define HASHAGG_READ_BUFFER_SIZE BLCKSZ #define HASHAGG_READ_BUFFER_SIZE BLCKSZ
#define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ #define HASHAGG_WRITE_BUFFER_SIZE BLCKSZ
/*
* HyperLogLog is used for estimating the cardinality of the spilled tuples in
* a given partition. 5 bits corresponds to a size of about 32 bytes and a
* worst-case error of around 18%. That's effective enough to choose a
* reasonable number of partitions when recursing.
*/
#define HASHAGG_HLL_BIT_WIDTH 5
/* /*
* Estimate chunk overhead as a constant 16 bytes. XXX: should this be * Estimate chunk overhead as a constant 16 bytes. XXX: should this be
* improved? * improved?
...@@ -339,6 +349,7 @@ typedef struct HashAggSpill ...@@ -339,6 +349,7 @@ typedef struct HashAggSpill
int64 *ntuples; /* number of tuples in each partition */ int64 *ntuples; /* number of tuples in each partition */
uint32 mask; /* mask to find partition from hash value */ uint32 mask; /* mask to find partition from hash value */
int shift; /* after masking, shift by this amount */ int shift; /* after masking, shift by this amount */
hyperLogLogState *hll_card; /* cardinality estimate for contents */
} HashAggSpill; } HashAggSpill;
/* /*
...@@ -357,6 +368,7 @@ typedef struct HashAggBatch ...@@ -357,6 +368,7 @@ typedef struct HashAggBatch
LogicalTapeSet *tapeset; /* borrowed reference to tape set */ LogicalTapeSet *tapeset; /* borrowed reference to tape set */
int input_tapenum; /* input partition tape */ int input_tapenum; /* input partition tape */
int64 input_tuples; /* number of tuples in this batch */ int64 input_tuples; /* number of tuples in this batch */
double input_card; /* estimated group cardinality */
} HashAggBatch; } HashAggBatch;
/* used to find referenced colnos */ /* used to find referenced colnos */
...@@ -411,7 +423,7 @@ static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, ...@@ -411,7 +423,7 @@ static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
static long hash_choose_num_buckets(double hashentrysize, static long hash_choose_num_buckets(double hashentrysize,
long estimated_nbuckets, long estimated_nbuckets,
Size memory); Size memory);
static int hash_choose_num_partitions(uint64 input_groups, static int hash_choose_num_partitions(double input_groups,
double hashentrysize, double hashentrysize,
int used_bits, int used_bits,
int *log2_npartittions); int *log2_npartittions);
...@@ -432,10 +444,11 @@ static void hashagg_finish_initial_spills(AggState *aggstate); ...@@ -432,10 +444,11 @@ static void hashagg_finish_initial_spills(AggState *aggstate);
static void hashagg_reset_spill_state(AggState *aggstate); static void hashagg_reset_spill_state(AggState *aggstate);
static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset, static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset,
int input_tapenum, int setno, int input_tapenum, int setno,
int64 input_tuples, int used_bits); int64 input_tuples, double input_card,
int used_bits);
static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo,
int used_bits, uint64 input_tuples, int used_bits, double input_groups,
double hashentrysize); double hashentrysize);
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
TupleTableSlot *slot, uint32 hash); TupleTableSlot *slot, uint32 hash);
...@@ -1777,7 +1790,7 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) ...@@ -1777,7 +1790,7 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
* substantially larger than the initial value. * substantially larger than the initial value.
*/ */
void void
hash_agg_set_limits(double hashentrysize, uint64 input_groups, int used_bits, hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
Size *mem_limit, uint64 *ngroups_limit, Size *mem_limit, uint64 *ngroups_limit,
int *num_partitions) int *num_partitions)
{ {
...@@ -1969,7 +1982,7 @@ hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory) ...@@ -1969,7 +1982,7 @@ hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
* *log2_npartitions to the log2() of the number of partitions. * *log2_npartitions to the log2() of the number of partitions.
*/ */
static int static int
hash_choose_num_partitions(uint64 input_groups, double hashentrysize, hash_choose_num_partitions(double input_groups, double hashentrysize,
int used_bits, int *log2_npartitions) int used_bits, int *log2_npartitions)
{ {
Size mem_wanted; Size mem_wanted;
...@@ -2574,7 +2587,6 @@ agg_refill_hash_table(AggState *aggstate) ...@@ -2574,7 +2587,6 @@ agg_refill_hash_table(AggState *aggstate)
AggStatePerHash perhash; AggStatePerHash perhash;
HashAggSpill spill; HashAggSpill spill;
HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo;
uint64 ngroups_estimate;
bool spill_initialized = false; bool spill_initialized = false;
if (aggstate->hash_batches == NIL) if (aggstate->hash_batches == NIL)
...@@ -2583,16 +2595,7 @@ agg_refill_hash_table(AggState *aggstate) ...@@ -2583,16 +2595,7 @@ agg_refill_hash_table(AggState *aggstate)
batch = linitial(aggstate->hash_batches); batch = linitial(aggstate->hash_batches);
aggstate->hash_batches = list_delete_first(aggstate->hash_batches); aggstate->hash_batches = list_delete_first(aggstate->hash_batches);
/* hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
* Estimate the number of groups for this batch as the total number of
* tuples in its input file. Although that's a worst case, it's not bad
* here for two reasons: (1) overestimating is better than
* underestimating; and (2) we've already scanned the relation once, so
* it's likely that we've already finalized many of the common values.
*/
ngroups_estimate = batch->input_tuples;
hash_agg_set_limits(aggstate->hashentrysize, ngroups_estimate,
batch->used_bits, &aggstate->hash_mem_limit, batch->used_bits, &aggstate->hash_mem_limit,
&aggstate->hash_ngroups_limit, NULL); &aggstate->hash_ngroups_limit, NULL);
...@@ -2678,7 +2681,7 @@ agg_refill_hash_table(AggState *aggstate) ...@@ -2678,7 +2681,7 @@ agg_refill_hash_table(AggState *aggstate)
*/ */
spill_initialized = true; spill_initialized = true;
hashagg_spill_init(&spill, tapeinfo, batch->used_bits, hashagg_spill_init(&spill, tapeinfo, batch->used_bits,
ngroups_estimate, aggstate->hashentrysize); batch->input_card, aggstate->hashentrysize);
} }
/* no memory for a new group, spill */ /* no memory for a new group, spill */
hashagg_spill_tuple(aggstate, &spill, spillslot, hash); hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
...@@ -2936,7 +2939,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum) ...@@ -2936,7 +2939,7 @@ hashagg_tapeinfo_release(HashTapeInfo *tapeinfo, int tapenum)
*/ */
static void static void
hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
uint64 input_groups, double hashentrysize) double input_groups, double hashentrysize)
{ {
int npartitions; int npartitions;
int partition_bits; int partition_bits;
...@@ -2946,6 +2949,7 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, ...@@ -2946,6 +2949,7 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
spill->partitions = palloc0(sizeof(int) * npartitions); spill->partitions = palloc0(sizeof(int) * npartitions);
spill->ntuples = palloc0(sizeof(int64) * npartitions); spill->ntuples = palloc0(sizeof(int64) * npartitions);
spill->hll_card = palloc0(sizeof(hyperLogLogState) * npartitions);
hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions); hashagg_tapeinfo_assign(tapeinfo, spill->partitions, npartitions);
...@@ -2953,6 +2957,9 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, ...@@ -2953,6 +2957,9 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits,
spill->shift = 32 - used_bits - partition_bits; spill->shift = 32 - used_bits - partition_bits;
spill->mask = (npartitions - 1) << spill->shift; spill->mask = (npartitions - 1) << spill->shift;
spill->npartitions = npartitions; spill->npartitions = npartitions;
for (int i = 0; i < npartitions; i++)
initHyperLogLog(&spill->hll_card[i], HASHAGG_HLL_BIT_WIDTH);
} }
/* /*
...@@ -3001,6 +3008,13 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, ...@@ -3001,6 +3008,13 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
partition = (hash & spill->mask) >> spill->shift; partition = (hash & spill->mask) >> spill->shift;
spill->ntuples[partition]++; spill->ntuples[partition]++;
/*
* All hash values destined for a given partition have some bits in
* common, which causes bad HLL cardinality estimates. Hash the hash to
* get a more uniform distribution.
*/
addHyperLogLog(&spill->hll_card[partition], hash_bytes_uint32(hash));
tapenum = spill->partitions[partition]; tapenum = spill->partitions[partition];
LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32)); LogicalTapeWrite(tapeset, tapenum, (void *) &hash, sizeof(uint32));
...@@ -3023,7 +3037,7 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, ...@@ -3023,7 +3037,7 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
*/ */
static HashAggBatch * static HashAggBatch *
hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
int64 input_tuples, int used_bits) int64 input_tuples, double input_card, int used_bits)
{ {
HashAggBatch *batch = palloc0(sizeof(HashAggBatch)); HashAggBatch *batch = palloc0(sizeof(HashAggBatch));
...@@ -3032,6 +3046,7 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, ...@@ -3032,6 +3046,7 @@ hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno,
batch->tapeset = tapeset; batch->tapeset = tapeset;
batch->input_tapenum = tapenum; batch->input_tapenum = tapenum;
batch->input_tuples = input_tuples; batch->input_tuples = input_tuples;
batch->input_card = input_card;
return batch; return batch;
} }
...@@ -3135,21 +3150,26 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) ...@@ -3135,21 +3150,26 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno)
for (i = 0; i < spill->npartitions; i++) for (i = 0; i < spill->npartitions; i++)
{ {
int tapenum = spill->partitions[i]; int tapenum = spill->partitions[i];
HashAggBatch *new_batch; HashAggBatch *new_batch;
double cardinality;
/* if the partition is empty, don't create a new batch of work */ /* if the partition is empty, don't create a new batch of work */
if (spill->ntuples[i] == 0) if (spill->ntuples[i] == 0)
continue; continue;
cardinality = estimateHyperLogLog(&spill->hll_card[i]);
freeHyperLogLog(&spill->hll_card[i]);
new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset, new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset,
tapenum, setno, spill->ntuples[i], tapenum, setno, spill->ntuples[i],
used_bits); cardinality, used_bits);
aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches); aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches);
aggstate->hash_batches_used++; aggstate->hash_batches_used++;
} }
pfree(spill->ntuples); pfree(spill->ntuples);
pfree(spill->hll_card);
pfree(spill->partitions); pfree(spill->partitions);
} }
......
...@@ -320,7 +320,7 @@ extern void ExecReScanAgg(AggState *node); ...@@ -320,7 +320,7 @@ extern void ExecReScanAgg(AggState *node);
extern Size hash_agg_entry_size(int numTrans, Size tupleWidth, extern Size hash_agg_entry_size(int numTrans, Size tupleWidth,
Size transitionSpace); Size transitionSpace);
extern void hash_agg_set_limits(double hashentrysize, uint64 input_groups, extern void hash_agg_set_limits(double hashentrysize, double input_groups,
int used_bits, Size *mem_limit, int used_bits, Size *mem_limit,
uint64 *ngroups_limit, int *num_partitions); uint64 *ngroups_limit, int *num_partitions);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment