Commit 18042840 authored by Andres Freund's avatar Andres Freund

Add parallel-aware hash joins.

Introduce parallel-aware hash joins that appear in EXPLAIN plans as Parallel
Hash Join with Parallel Hash.  While hash joins could already appear in
parallel queries, they were previously always parallel-oblivious and had a
partial subplan only on the outer side, meaning that the work of the inner
subplan was duplicated in every worker.

After this commit, the planner will consider using a partial subplan on the
inner side too, using the Parallel Hash node to divide the work over the
available CPU cores and combine its results in shared memory.  If the join
needs to be split into multiple batches in order to respect work_mem, then
workers process different batches as much as possible and then work together
on the remaining batches.

The advantages of a parallel-aware hash join over a parallel-oblivious hash
join used in a parallel query are that it:

 * avoids wasting memory on duplicated hash tables
 * avoids wasting disk space on duplicated batch files
 * divides the work of building the hash table over the CPUs

One disadvantage is that there is some communication between the participating
CPUs which might outweigh the benefits of parallelism in the case of small
hash tables.  This is avoided by the planner's existing reluctance to supply
partial plans for small scans, but it may be necessary to estimate
synchronization costs in future if that situation changes.  Another is that
outer batch 0 must be written to disk if multiple batches are required.

A potential future advantage of parallel-aware hash joins is that right and
full outer joins could be supported, since there is a single set of matched
bits for each hashtable, but that is not yet implemented.

A new GUC enable_parallel_hash is defined to control the feature, defaulting
to on.

Author: Thomas Munro
Reviewed-By: Andres Freund, Robert Haas
Tested-By: Rafia Sabih, Prabhat Sahu
Discussion:
    https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com
    https://postgr.es/m/CAEepm=37HKyJ4U6XOLi=JgfSHM3o6B-GaeO-6hkOmneTDkH+Uw@mail.gmail.com
parent f94eec49
......@@ -3647,6 +3647,21 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>
</varlistentry>
<varlistentry id="guc-enable-parallel-hash" xreflabel="enable_parallel_hash">
<term><varname>enable_parallel_hash</varname> (<type>boolean</type>)
<indexterm>
<primary><varname>enable_parallel_hash</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Enables or disables the query planner's use of hash-join plan
types with parallel hash. Has no effect if hash-join plans are not
also enabled. The default is <literal>on</literal>.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-enable-partition-wise-join" xreflabel="enable_partition_wise_join">
<term><varname>enable_partition_wise_join</varname> (<type>boolean</type>)
<indexterm>
......
......@@ -1263,7 +1263,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting in an extension.</entry>
</row>
<row>
<entry morerows="17"><literal>IPC</literal></entry>
<entry morerows="32"><literal>IPC</literal></entry>
<entry><literal>BgWorkerShutdown</literal></entry>
<entry>Waiting for background worker to shut down.</entry>
</row>
......@@ -1279,6 +1279,66 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><literal>ExecuteGather</literal></entry>
<entry>Waiting for activity from child process when executing <literal>Gather</literal> node.</entry>
</row>
<row>
<entry><literal>Hash/Batch/Allocating</literal></entry>
<entry>Waiting for an elected Parallel Hash participant to allocate a hash table.</entry>
</row>
<row>
<entry><literal>Hash/Batch/Electing</literal></entry>
<entry>Electing a Parallel Hash participant to allocate a hash table.</entry>
</row>
<row>
<entry><literal>Hash/Batch/Loading</literal></entry>
<entry>Waiting for other Parallel Hash participants to finish loading a hash table.</entry>
</row>
<row>
<entry><literal>Hash/Build/Allocating</literal></entry>
<entry>Waiting for an elected Parallel Hash participant to allocate the initial hash table.</entry>
</row>
<row>
<entry><literal>Hash/Build/Electing</literal></entry>
<entry>Electing a Parallel Hash participant to allocate the initial hash table.</entry>
</row>
<row>
<entry><literal>Hash/Build/HashingInner</literal></entry>
<entry>Waiting for other Parallel Hash participants to finish hashing the inner relation.</entry>
</row>
<row>
<entry><literal>Hash/Build/HashingOuter</literal></entry>
<entry>Waiting for other Parallel Hash participants to finish partitioning the outer relation.</entry>
</row>
<row>
<entry><literal>Hash/GrowBatches/Allocating</literal></entry>
<entry>Waiting for an elected Parallel Hash participant to allocate more batches.</entry>
</row>
<row>
<entry><literal>Hash/GrowBatches/Deciding</literal></entry>
<entry>Electing a Parallel Hash participant to decide on future batch growth.</entry>
</row>
<row>
<entry><literal>Hash/GrowBatches/Electing</literal></entry>
<entry>Electing a Parallel Hash participant to allocate more batches.</entry>
</row>
<row>
<entry><literal>Hash/GrowBatches/Finishing</literal></entry>
<entry>Waiting for an elected Parallel Hash participant to decide on future batch growth.</entry>
</row>
<row>
<entry><literal>Hash/GrowBatches/Repartitioning</literal></entry>
<entry>Waiting for other Parallel Hash participants to finishing repartitioning.</entry>
</row>
<row>
<entry><literal>Hash/GrowBuckets/Allocating</literal></entry>
<entry>Waiting for an elected Parallel Hash participant to finish allocating more buckets.</entry>
</row>
<row>
<entry><literal>Hash/GrowBuckets/Electing</literal></entry>
<entry>Electing a Parallel Hash participant to allocate more buckets.</entry>
</row>
<row>
<entry><literal>Hash/GrowBuckets/Reinserting</literal></entry>
<entry>Waiting for other Parallel Hash participants to finish inserting tuples into new buckets.</entry>
</row>
<row>
<entry><literal>LogicalSyncData</literal></entry>
<entry>Waiting for logical replication remote server to send data for initial table synchronization.</entry>
......
......@@ -31,6 +31,7 @@
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "executor/nodeIndexscan.h"
#include "executor/nodeIndexonlyscan.h"
#include "executor/nodeSeqscan.h"
......@@ -266,6 +267,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
e->pcxt);
break;
case T_HashJoinState:
if (planstate->plan->parallel_aware)
ExecHashJoinEstimate((HashJoinState *) planstate,
e->pcxt);
break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashEstimate((HashState *) planstate, e->pcxt);
......@@ -474,6 +480,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
d->pcxt);
break;
case T_HashJoinState:
if (planstate->plan->parallel_aware)
ExecHashJoinInitializeDSM((HashJoinState *) planstate,
d->pcxt);
break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
......@@ -898,6 +909,11 @@ ExecParallelReInitializeDSM(PlanState *planstate,
ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
pcxt);
break;
case T_HashJoinState:
if (planstate->plan->parallel_aware)
ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
pcxt);
break;
case T_HashState:
case T_SortState:
/* these nodes have DSM state, but no reinitialization is required */
......@@ -1196,6 +1212,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
pwcxt);
break;
case T_HashJoinState:
if (planstate->plan->parallel_aware)
ExecHashJoinInitializeWorker((HashJoinState *) planstate,
pwcxt);
break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashInitializeWorker((HashState *) planstate, pwcxt);
......
......@@ -770,6 +770,9 @@ ExecShutdownNode(PlanState *node)
case T_HashState:
ExecShutdownHash((HashState *) node);
break;
case T_HashJoinState:
ExecShutdownHashJoin((HashJoinState *) node);
break;
default:
break;
}
......
This diff is collapsed.
This diff is collapsed.
......@@ -1057,6 +1057,7 @@ _copyHash(const Hash *from)
COPY_SCALAR_FIELD(skewTable);
COPY_SCALAR_FIELD(skewColumn);
COPY_SCALAR_FIELD(skewInherit);
COPY_SCALAR_FIELD(rows_total);
return newnode;
}
......
......@@ -927,6 +927,7 @@ _outHash(StringInfo str, const Hash *node)
WRITE_OID_FIELD(skewTable);
WRITE_INT_FIELD(skewColumn);
WRITE_BOOL_FIELD(skewInherit);
WRITE_FLOAT_FIELD(rows_total, "%.0f");
}
static void
......
......@@ -2213,6 +2213,7 @@ _readHash(void)
READ_OID_FIELD(skewTable);
READ_INT_FIELD(skewColumn);
READ_BOOL_FIELD(skewInherit);
READ_FLOAT_FIELD(rows_total);
READ_DONE();
}
......
......@@ -129,6 +129,7 @@ bool enable_hashjoin = true;
bool enable_gathermerge = true;
bool enable_partition_wise_join = false;
bool enable_parallel_append = true;
bool enable_parallel_hash = true;
typedef struct
{
......@@ -3130,16 +3131,19 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
JoinType jointype,
List *hashclauses,
Path *outer_path, Path *inner_path,
JoinPathExtraData *extra)
JoinPathExtraData *extra,
bool parallel_hash)
{
Cost startup_cost = 0;
Cost run_cost = 0;
double outer_path_rows = outer_path->rows;
double inner_path_rows = inner_path->rows;
double inner_path_rows_total = inner_path_rows;
int num_hashclauses = list_length(hashclauses);
int numbuckets;
int numbatches;
int num_skew_mcvs;
size_t space_allowed; /* unused */
/* cost of source data */
startup_cost += outer_path->startup_cost;
......@@ -3160,6 +3164,15 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
* inner_path_rows;
run_cost += cpu_operator_cost * num_hashclauses * outer_path_rows;
/*
* If this is a parallel hash build, then the value we have for
* inner_rows_total currently refers only to the rows returned by each
* participant. For shared hash table size estimation, we need the total
* number, so we need to undo the division.
*/
if (parallel_hash)
inner_path_rows_total *= get_parallel_divisor(inner_path);
/*
* Get hash table size that executor would use for inner relation.
*
......@@ -3170,9 +3183,12 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
* XXX at some point it might be interesting to try to account for skew
* optimization in the cost estimate, but for now, we don't.
*/
ExecChooseHashTableSize(inner_path_rows,
ExecChooseHashTableSize(inner_path_rows_total,
inner_path->pathtarget->width,
true, /* useskew */
parallel_hash, /* try_combined_work_mem */
outer_path->parallel_workers,
&space_allowed,
&numbuckets,
&numbatches,
&num_skew_mcvs);
......@@ -3204,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
workspace->run_cost = run_cost;
workspace->numbuckets = numbuckets;
workspace->numbatches = numbatches;
workspace->inner_rows_total = inner_path_rows_total;
}
/*
......@@ -3226,6 +3243,7 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path,
Path *inner_path = path->jpath.innerjoinpath;
double outer_path_rows = outer_path->rows;
double inner_path_rows = inner_path->rows;
double inner_path_rows_total = workspace->inner_rows_total;
List *hashclauses = path->path_hashclauses;
Cost startup_cost = workspace->startup_cost;
Cost run_cost = workspace->run_cost;
......@@ -3266,6 +3284,9 @@ final_cost_hashjoin(PlannerInfo *root, HashPath *path,
/* mark the path with estimated # of batches */
path->num_batches = numbatches;
/* store the total number of tuples (sum of partial row estimates) */
path->inner_rows_total = inner_path_rows_total;
/* and compute the number of "virtual" buckets in the whole join */
virtualbuckets = (double) numbuckets * (double) numbatches;
......
......@@ -747,7 +747,7 @@ try_hashjoin_path(PlannerInfo *root,
* never have any output pathkeys, per comments in create_hashjoin_path.
*/
initial_cost_hashjoin(root, &workspace, jointype, hashclauses,
outer_path, inner_path, extra);
outer_path, inner_path, extra, false);
if (add_path_precheck(joinrel,
workspace.startup_cost, workspace.total_cost,
......@@ -761,6 +761,7 @@ try_hashjoin_path(PlannerInfo *root,
extra,
outer_path,
inner_path,
false, /* parallel_hash */
extra->restrictlist,
required_outer,
hashclauses));
......@@ -776,6 +777,10 @@ try_hashjoin_path(PlannerInfo *root,
* try_partial_hashjoin_path
* Consider a partial hashjoin join path; if it appears useful, push it into
* the joinrel's partial_pathlist via add_partial_path().
* The outer side is partial. If parallel_hash is true, then the inner path
* must be partial and will be run in parallel to create one or more shared
* hash tables; otherwise the inner path must be complete and a copy of it
* is run in every process to create separate identical private hash tables.
*/
static void
try_partial_hashjoin_path(PlannerInfo *root,
......@@ -784,7 +789,8 @@ try_partial_hashjoin_path(PlannerInfo *root,
Path *inner_path,
List *hashclauses,
JoinType jointype,
JoinPathExtraData *extra)
JoinPathExtraData *extra,
bool parallel_hash)
{
JoinCostWorkspace workspace;
......@@ -808,7 +814,7 @@ try_partial_hashjoin_path(PlannerInfo *root,
* cost. Bail out right away if it looks terrible.
*/
initial_cost_hashjoin(root, &workspace, jointype, hashclauses,
outer_path, inner_path, extra);
outer_path, inner_path, extra, true);
if (!add_partial_path_precheck(joinrel, workspace.total_cost, NIL))
return;
......@@ -821,6 +827,7 @@ try_partial_hashjoin_path(PlannerInfo *root,
extra,
outer_path,
inner_path,
parallel_hash,
extra->restrictlist,
NULL,
hashclauses));
......@@ -1839,6 +1846,10 @@ hash_inner_and_outer(PlannerInfo *root,
* able to properly guarantee uniqueness. Similarly, we can't handle
* JOIN_FULL and JOIN_RIGHT, because they can produce false null
* extended rows. Also, the resulting path must not be parameterized.
* We would be able to support JOIN_FULL and JOIN_RIGHT for Parallel
* Hash, since in that case we're back to a single hash table with a
* single set of match bits for each batch, but that will require
* figuring out a deadlock-free way to wait for the probe to finish.
*/
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
......@@ -1848,11 +1859,27 @@ hash_inner_and_outer(PlannerInfo *root,
bms_is_empty(joinrel->lateral_relids))
{
Path *cheapest_partial_outer;
Path *cheapest_partial_inner = NULL;
Path *cheapest_safe_inner = NULL;
cheapest_partial_outer =
(Path *) linitial(outerrel->partial_pathlist);
/*
* Can we use a partial inner plan too, so that we can build a
* shared hash table in parallel?
*/
if (innerrel->partial_pathlist != NIL && enable_parallel_hash)
{
cheapest_partial_inner =
(Path *) linitial(innerrel->partial_pathlist);
try_partial_hashjoin_path(root, joinrel,
cheapest_partial_outer,
cheapest_partial_inner,
hashclauses, jointype, extra,
true /* parallel_hash */ );
}
/*
* Normally, given that the joinrel is parallel-safe, the cheapest
* total inner path will also be parallel-safe, but if not, we'll
......@@ -1870,7 +1897,8 @@ hash_inner_and_outer(PlannerInfo *root,
try_partial_hashjoin_path(root, joinrel,
cheapest_partial_outer,
cheapest_safe_inner,
hashclauses, jointype, extra);
hashclauses, jointype, extra,
false /* parallel_hash */ );
}
}
}
......
......@@ -4192,6 +4192,17 @@ create_hashjoin_plan(PlannerInfo *root,
copy_plan_costsize(&hash_plan->plan, inner_plan);
hash_plan->plan.startup_cost = hash_plan->plan.total_cost;
/*
* If parallel-aware, the executor will also need an estimate of the total
* number of rows expected from all participants so that it can size the
* shared hash table.
*/
if (best_path->jpath.path.parallel_aware)
{
hash_plan->plan.parallel_aware = true;
hash_plan->rows_total = best_path->inner_rows_total;
}
join_plan = make_hashjoin(tlist,
joinclauses,
otherclauses,
......
......@@ -2278,6 +2278,7 @@ create_mergejoin_path(PlannerInfo *root,
* 'extra' contains various information about the join
* 'outer_path' is the cheapest outer path
* 'inner_path' is the cheapest inner path
* 'parallel_hash' to select Parallel Hash of inner path (shared hash table)
* 'restrict_clauses' are the RestrictInfo nodes to apply at the join
* 'required_outer' is the set of required outer rels
* 'hashclauses' are the RestrictInfo nodes to use as hash clauses
......@@ -2291,6 +2292,7 @@ create_hashjoin_path(PlannerInfo *root,
JoinPathExtraData *extra,
Path *outer_path,
Path *inner_path,
bool parallel_hash,
List *restrict_clauses,
Relids required_outer,
List *hashclauses)
......@@ -2308,7 +2310,8 @@ create_hashjoin_path(PlannerInfo *root,
extra->sjinfo,
required_outer,
&restrict_clauses);
pathnode->jpath.path.parallel_aware = false;
pathnode->jpath.path.parallel_aware =
joinrel->consider_parallel && parallel_hash;
pathnode->jpath.path.parallel_safe = joinrel->consider_parallel &&
outer_path->parallel_safe && inner_path->parallel_safe;
/* This is a foolish way to estimate parallel_workers, but for now... */
......
......@@ -3586,6 +3586,51 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_EXECUTE_GATHER:
event_name = "ExecuteGather";
break;
case WAIT_EVENT_HASH_BATCH_ALLOCATING:
event_name = "Hash/Batch/Allocating";
break;
case WAIT_EVENT_HASH_BATCH_ELECTING:
event_name = "Hash/Batch/Electing";
break;
case WAIT_EVENT_HASH_BATCH_LOADING:
event_name = "Hash/Batch/Loading";
break;
case WAIT_EVENT_HASH_BUILD_ALLOCATING:
event_name = "Hash/Build/Allocating";
break;
case WAIT_EVENT_HASH_BUILD_ELECTING:
event_name = "Hash/Build/Electing";
break;
case WAIT_EVENT_HASH_BUILD_HASHING_INNER:
event_name = "Hash/Build/HashingInner";
break;
case WAIT_EVENT_HASH_BUILD_HASHING_OUTER:
event_name = "Hash/Build/HashingOuter";
break;
case WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING:
event_name = "Hash/GrowBatches/Allocating";
break;
case WAIT_EVENT_HASH_GROW_BATCHES_DECIDING:
event_name = "Hash/GrowBatches/Deciding";
break;
case WAIT_EVENT_HASH_GROW_BATCHES_ELECTING:
event_name = "Hash/GrowBatches/Electing";
break;
case WAIT_EVENT_HASH_GROW_BATCHES_FINISHING:
event_name = "Hash/GrowBatches/Finishing";
break;
case WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING:
event_name = "Hash/GrowBatches/Repartitioning";
break;
case WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING:
event_name = "Hash/GrowBuckets/Allocating";
break;
case WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING:
event_name = "Hash/GrowBuckets/Electing";
break;
case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING:
event_name = "Hash/GrowBuckets/Reinserting";
break;
case WAIT_EVENT_LOGICAL_SYNC_DATA:
event_name = "LogicalSyncData";
break;
......
......@@ -929,7 +929,15 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
{
{"enable_parallel_hash", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables the planner's user of parallel hash plans."),
NULL
},
&enable_parallel_hash,
true,
NULL, NULL, NULL
},
{
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
gettext_noop("Enables genetic query optimization."),
......
......@@ -301,6 +301,7 @@
#enable_sort = on
#enable_tidscan = on
#enable_partition_wise_join = off
#enable_parallel_hash = on
# - Planner Cost Constants -
......
......@@ -15,7 +15,10 @@
#define HASHJOIN_H
#include "nodes/execnodes.h"
#include "port/atomics.h"
#include "storage/barrier.h"
#include "storage/buffile.h"
#include "storage/lwlock.h"
/* ----------------------------------------------------------------
* hash-join hash table structures
......@@ -63,7 +66,12 @@
typedef struct HashJoinTupleData
{
struct HashJoinTupleData *next; /* link to next tuple in same bucket */
/* link to next tuple in same bucket */
union
{
struct HashJoinTupleData *unshared;
dsa_pointer shared;
} next;
uint32 hashvalue; /* tuple's hash code */
/* Tuple data, in MinimalTuple format, follows on a MAXALIGN boundary */
} HashJoinTupleData;
......@@ -112,8 +120,12 @@ typedef struct HashMemoryChunkData
size_t maxlen; /* size of the buffer holding the tuples */
size_t used; /* number of buffer bytes already used */
struct HashMemoryChunkData *next; /* pointer to the next chunk (linked
* list) */
/* pointer to the next chunk (linked list) */
union
{
struct HashMemoryChunkData *unshared;
dsa_pointer shared;
} next;
char data[FLEXIBLE_ARRAY_MEMBER]; /* buffer allocated at the end */
} HashMemoryChunkData;
......@@ -121,8 +133,148 @@ typedef struct HashMemoryChunkData
typedef struct HashMemoryChunkData *HashMemoryChunk;
#define HASH_CHUNK_SIZE (32 * 1024L)
#define HASH_CHUNK_HEADER_SIZE (offsetof(HashMemoryChunkData, data))
#define HASH_CHUNK_THRESHOLD (HASH_CHUNK_SIZE / 4)
/*
* For each batch of a Parallel Hash Join, we have a ParallelHashJoinBatch
* object in shared memory to coordinate access to it. Since they are
* followed by variable-sized objects, they are arranged in contiguous memory
* but not accessed directly as an array.
*/
typedef struct ParallelHashJoinBatch
{
dsa_pointer buckets; /* array of hash table buckets */
Barrier batch_barrier; /* synchronization for joining this batch */
dsa_pointer chunks; /* chunks of tuples loaded */
size_t size; /* size of buckets + chunks in memory */
size_t estimated_size; /* size of buckets + chunks while writing */
size_t ntuples; /* number of tuples loaded */
size_t old_ntuples; /* number of tuples before repartitioning */
bool space_exhausted;
/*
* Variable-sized SharedTuplestore objects follow this struct in memory.
* See the accessor macros below.
*/
} ParallelHashJoinBatch;
/* Accessor for inner batch tuplestore following a ParallelHashJoinBatch. */
#define ParallelHashJoinBatchInner(batch) \
((SharedTuplestore *) \
((char *) (batch) + MAXALIGN(sizeof(ParallelHashJoinBatch))))
/* Accessor for outer batch tuplestore following a ParallelHashJoinBatch. */
#define ParallelHashJoinBatchOuter(batch, nparticipants) \
((SharedTuplestore *) \
((char *) ParallelHashJoinBatchInner(batch) + \
MAXALIGN(sts_estimate(nparticipants))))
/* Total size of a ParallelHashJoinBatch and tuplestores. */
#define EstimateParallelHashJoinBatch(hashtable) \
(MAXALIGN(sizeof(ParallelHashJoinBatch)) + \
MAXALIGN(sts_estimate((hashtable)->parallel_state->nparticipants)) * 2)
/* Accessor for the nth ParallelHashJoinBatch given the base. */
#define NthParallelHashJoinBatch(base, n) \
((ParallelHashJoinBatch *) \
((char *) (base) + \
EstimateParallelHashJoinBatch(hashtable) * (n)))
/*
* Each backend requires a small amount of per-batch state to interact with
* each ParalellHashJoinBatch.
*/
typedef struct ParallelHashJoinBatchAccessor
{
ParallelHashJoinBatch *shared; /* pointer to shared state */
/* Per-backend partial counters to reduce contention. */
size_t preallocated; /* pre-allocated space for this backend */
size_t ntuples; /* number of tuples */
size_t size; /* size of partition in memory */
size_t estimated_size; /* size of partition on disk */
size_t old_ntuples; /* how many tuples before repartioning? */
bool at_least_one_chunk; /* has this backend allocated a chunk? */
bool done; /* flag to remember that a batch is done */
SharedTuplestoreAccessor *inner_tuples;
SharedTuplestoreAccessor *outer_tuples;
} ParallelHashJoinBatchAccessor;
/*
* While hashing the inner relation, any participant might determine that it's
* time to increase the number of buckets to reduce the load factor or batches
* to reduce the memory size. This is indicated by setting the growth flag to
* these values.
*/
typedef enum ParallelHashGrowth
{
/* The current dimensions are sufficient. */
PHJ_GROWTH_OK,
/* The load factor is too high, so we need to add buckets. */
PHJ_GROWTH_NEED_MORE_BUCKETS,
/* The memory budget would be exhausted, so we need to repartition. */
PHJ_GROWTH_NEED_MORE_BATCHES,
/* Repartitioning didn't help last time, so don't try to do that again. */
PHJ_GROWTH_DISABLED
} ParallelHashGrowth;
/*
* The shared state used to coordinate a Parallel Hash Join. This is stored
* in the DSM segment.
*/
typedef struct ParallelHashJoinState
{
dsa_pointer batches; /* array of ParallelHashJoinBatch */
dsa_pointer old_batches; /* previous generation during repartition */
int nbatch; /* number of batches now */
int old_nbatch; /* previous number of batches */
int nbuckets; /* number of buckets */
ParallelHashGrowth growth; /* control batch/bucket growth */
dsa_pointer chunk_work_queue; /* chunk work queue */
int nparticipants;
size_t space_allowed;
size_t total_tuples; /* total number of inner tuples */
LWLock lock; /* lock protecting the above */
Barrier build_barrier; /* synchronization for the build phases */
Barrier grow_batches_barrier;
Barrier grow_buckets_barrier;
pg_atomic_uint32 distributor; /* counter for load balancing */
SharedFileSet fileset; /* space for shared temporary files */
} ParallelHashJoinState;
/* The phases for building batches, used by build_barrier. */
#define PHJ_BUILD_ELECTING 0
#define PHJ_BUILD_ALLOCATING 1
#define PHJ_BUILD_HASHING_INNER 2
#define PHJ_BUILD_HASHING_OUTER 3
#define PHJ_BUILD_DONE 4
/* The phases for probing each batch, used by for batch_barrier. */
#define PHJ_BATCH_ELECTING 0
#define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBING 3
#define PHJ_BATCH_DONE 4
/* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECTING 0
#define PHJ_GROW_BATCHES_ALLOCATING 1
#define PHJ_GROW_BATCHES_REPARTITIONING 2
#define PHJ_GROW_BATCHES_DECIDING 3
#define PHJ_GROW_BATCHES_FINISHING 4
#define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
/* The phases of bucket growth while hashing, for grow_buckets_barrier. */
#define PHJ_GROW_BUCKETS_ELECTING 0
#define PHJ_GROW_BUCKETS_ALLOCATING 1
#define PHJ_GROW_BUCKETS_REINSERTING 2
#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
typedef struct HashJoinTableData
{
int nbuckets; /* # buckets in the in-memory hash table */
......@@ -133,8 +285,13 @@ typedef struct HashJoinTableData
int log2_nbuckets_optimal; /* log2(nbuckets_optimal) */
/* buckets[i] is head of list of tuples in i'th in-memory bucket */
struct HashJoinTupleData **buckets;
/* buckets array is per-batch storage, as are all the tuples */
union
{
/* unshared array is per-batch storage, as are all the tuples */
struct HashJoinTupleData **unshared;
/* shared array is per-query DSA area, as are all the tuples */
dsa_pointer_atomic *shared;
} buckets;
bool keepNulls; /* true to store unmatchable NULL tuples */
......@@ -153,6 +310,7 @@ typedef struct HashJoinTableData
bool growEnabled; /* flag to shut off nbatch increases */
double totalTuples; /* # tuples obtained from inner plan */
double partialTuples; /* # tuples obtained from inner plan by me */
double skewTuples; /* # tuples inserted into skew tuples */
/*
......@@ -185,6 +343,13 @@ typedef struct HashJoinTableData
/* used for dense allocation of tuples (into linked chunks) */
HashMemoryChunk chunks; /* one list for the whole batch */
/* Shared and private state for Parallel Hash. */
HashMemoryChunk current_chunk; /* this backend's current chunk */
dsa_area *area; /* DSA area to allocate memory from */
ParallelHashJoinState *parallel_state;
ParallelHashJoinBatchAccessor *batches;
dsa_pointer current_chunk_shared;
} HashJoinTableData;
#endif /* HASHJOIN_H */
......@@ -17,17 +17,33 @@
#include "access/parallel.h"
#include "nodes/execnodes.h"
struct SharedHashJoinBatch;
extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
extern Node *MultiExecHash(HashState *node);
extern void ExecEndHash(HashState *node);
extern void ExecReScanHash(HashState *node);
extern HashJoinTable ExecHashTableCreate(Hash *node, List *hashOperators,
extern HashJoinTable ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls);
extern void ExecParallelHashTableAlloc(HashJoinTable hashtable,
int batchno);
extern void ExecHashTableDestroy(HashJoinTable hashtable);
extern void ExecHashTableDetach(HashJoinTable hashtable);
extern void ExecHashTableDetachBatch(HashJoinTable hashtable);
extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable,
int batchno);
void ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno);
extern void ExecHashTableInsert(HashJoinTable hashtable,
TupleTableSlot *slot,
uint32 hashvalue);
extern void ExecParallelHashTableInsert(HashJoinTable hashtable,
TupleTableSlot *slot,
uint32 hashvalue);
extern void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
TupleTableSlot *slot,
uint32 hashvalue);
extern bool ExecHashGetHashValue(HashJoinTable hashtable,
ExprContext *econtext,
List *hashkeys,
......@@ -39,12 +55,16 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
int *bucketno,
int *batchno);
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
ExprContext *econtext);
extern void ExecHashTableReset(HashJoinTable hashtable);
extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
bool try_combined_work_mem,
int parallel_workers,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
int *num_skew_mcvs);
......@@ -55,6 +75,6 @@ extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwc
extern void ExecHashRetrieveInstrumentation(HashState *node);
extern void ExecShutdownHash(HashState *node);
extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
HashJoinTable hashtable);
HashJoinTable hashtable);
#endif /* NODEHASH_H */
......@@ -20,6 +20,12 @@
extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags);
extern void ExecEndHashJoin(HashJoinState *node);
extern void ExecReScanHashJoin(HashJoinState *node);
extern void ExecShutdownHashJoin(HashJoinState *node);
extern void ExecHashJoinEstimate(HashJoinState *state, ParallelContext *pcxt);
extern void ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt);
extern void ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt);
extern void ExecHashJoinInitializeWorker(HashJoinState *state,
ParallelWorkerContext *pwcxt);
extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
BufFile **fileptr);
......
......@@ -25,6 +25,7 @@
#include "utils/hsearch.h"
#include "utils/queryenvironment.h"
#include "utils/reltrigger.h"
#include "utils/sharedtuplestore.h"
#include "utils/sortsupport.h"
#include "utils/tuplestore.h"
#include "utils/tuplesort.h"
......@@ -43,6 +44,8 @@ struct ExprState; /* forward references in this file */
struct ExprContext;
struct ExprEvalStep; /* avoid including execExpr.h everywhere */
struct ParallelHashJoinState;
typedef Datum (*ExprStateEvalFunc) (struct ExprState *expression,
struct ExprContext *econtext,
bool *isNull);
......@@ -2026,6 +2029,9 @@ typedef struct HashState
SharedHashInfo *shared_info; /* one entry per worker */
HashInstrumentation *hinstrument; /* this worker's entry */
/* Parallel hash state. */
struct ParallelHashJoinState *parallel_state;
} HashState;
/* ----------------
......
......@@ -880,6 +880,7 @@ typedef struct Hash
AttrNumber skewColumn; /* outer join key's column #, or zero */
bool skewInherit; /* is outer join rel an inheritance tree? */
/* all other info is in the parent HashJoin node */
double rows_total; /* estimate total rows if parallel_aware */
} Hash;
/* ----------------
......
......@@ -1464,6 +1464,7 @@ typedef struct HashPath
JoinPath jpath;
List *path_hashclauses; /* join clauses used for hashing */
int num_batches; /* number of batches expected */
double inner_rows_total; /* total inner rows expected */
} HashPath;
/*
......@@ -2315,6 +2316,7 @@ typedef struct JoinCostWorkspace
/* private for cost_hashjoin code */
int numbuckets;
int numbatches;
double inner_rows_total;
} JoinCostWorkspace;
#endif /* RELATION_H */
......@@ -69,6 +69,7 @@ extern bool enable_hashjoin;
extern bool enable_gathermerge;
extern bool enable_partition_wise_join;
extern bool enable_parallel_append;
extern bool enable_parallel_hash;
extern int constraint_exclusion;
extern double clamp_row_est(double nrows);
......@@ -153,7 +154,8 @@ extern void initial_cost_hashjoin(PlannerInfo *root,
JoinType jointype,
List *hashclauses,
Path *outer_path, Path *inner_path,
JoinPathExtraData *extra);
JoinPathExtraData *extra,
bool parallel_hash);
extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path,
JoinCostWorkspace *workspace,
JoinPathExtraData *extra);
......
......@@ -153,6 +153,7 @@ extern HashPath *create_hashjoin_path(PlannerInfo *root,
JoinPathExtraData *extra,
Path *outer_path,
Path *inner_path,
bool parallel_hash,
List *restrict_clauses,
Relids required_outer,
List *hashclauses);
......
......@@ -803,6 +803,21 @@ typedef enum
WAIT_EVENT_BGWORKER_STARTUP,
WAIT_EVENT_BTREE_PAGE,
WAIT_EVENT_EXECUTE_GATHER,
WAIT_EVENT_HASH_BATCH_ALLOCATING,
WAIT_EVENT_HASH_BATCH_ELECTING,
WAIT_EVENT_HASH_BATCH_LOADING,
WAIT_EVENT_HASH_BUILD_ALLOCATING,
WAIT_EVENT_HASH_BUILD_ELECTING,
WAIT_EVENT_HASH_BUILD_HASHING_INNER,
WAIT_EVENT_HASH_BUILD_HASHING_OUTER,
WAIT_EVENT_HASH_GROW_BATCHES_ELECTING,
WAIT_EVENT_HASH_GROW_BATCHES_FINISHING,
WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING,
WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING,
WAIT_EVENT_HASH_GROW_BATCHES_DECIDING,
WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING,
WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING,
WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING,
WAIT_EVENT_LOGICAL_SYNC_DATA,
WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,
WAIT_EVENT_MQ_INTERNAL,
......
......@@ -211,6 +211,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_BUFFER_MAPPING,
LWTRANCHE_LOCK_MANAGER,
LWTRANCHE_PREDICATE_LOCK_MANAGER,
LWTRANCHE_PARALLEL_HASH_JOIN,
LWTRANCHE_PARALLEL_QUERY_DSA,
LWTRANCHE_SESSION_DSA,
LWTRANCHE_SESSION_RECORD_TABLE,
......
This diff is collapsed.
......@@ -82,11 +82,12 @@ select name, setting from pg_settings where name like 'enable%';
enable_mergejoin | on
enable_nestloop | on
enable_parallel_append | on
enable_parallel_hash | on
enable_partition_wise_join | off
enable_seqscan | on
enable_sort | on
enable_tidscan | on
(14 rows)
(15 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail
......
......@@ -2028,6 +2028,10 @@ update pg_class
set reltuples = 2, relpages = pg_relation_size('extremely_skewed') / 8192
where relname = 'extremely_skewed';
-- Make a relation with a couple of enormous tuples.
create table wide as select generate_series(1, 2) as id, rpad('', 320000, 'x') as t;
alter table wide set (parallel_workers = 2);
-- The "optimal" case: the hash table fits in memory; we plan for 1
-- batch, we stick to that number, and peak memory usage stays within
-- our work_mem budget
......@@ -2050,6 +2054,22 @@ rollback to settings;
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '4MB';
set local enable_parallel_hash = off;
explain (costs off)
select count(*) from simple r join simple s using (id);
select count(*) from simple r join simple s using (id);
select original > 1 as initially_multibatch, final > original as increased_batches
from hash_join_batches(
$$
select count(*) from simple r join simple s using (id);
$$);
rollback to settings;
-- parallel with parallel-aware hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '4MB';
set local enable_parallel_hash = on;
explain (costs off)
select count(*) from simple r join simple s using (id);
select count(*) from simple r join simple s using (id);
......@@ -2082,6 +2102,22 @@ rollback to settings;
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '128kB';
set local enable_parallel_hash = off;
explain (costs off)
select count(*) from simple r join simple s using (id);
select count(*) from simple r join simple s using (id);
select original > 1 as initially_multibatch, final > original as increased_batches
from hash_join_batches(
$$
select count(*) from simple r join simple s using (id);
$$);
rollback to settings;
-- parallel with parallel-aware hash join
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '128kB';
set local enable_parallel_hash = on;
explain (costs off)
select count(*) from simple r join simple s using (id);
select count(*) from simple r join simple s using (id);
......@@ -2115,6 +2151,22 @@ rollback to settings;
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '128kB';
set local enable_parallel_hash = off;
explain (costs off)
select count(*) from simple r join bigger_than_it_looks s using (id);
select count(*) from simple r join bigger_than_it_looks s using (id);
select original > 1 as initially_multibatch, final > original as increased_batches
from hash_join_batches(
$$
select count(*) from simple r join bigger_than_it_looks s using (id);
$$);
rollback to settings;
-- parallel with parallel-aware hash join
savepoint settings;
set local max_parallel_workers_per_gather = 1;
set local work_mem = '192kB';
set local enable_parallel_hash = on;
explain (costs off)
select count(*) from simple r join bigger_than_it_looks s using (id);
select count(*) from simple r join bigger_than_it_looks s using (id);
......@@ -2148,6 +2200,21 @@ rollback to settings;
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '128kB';
set local enable_parallel_hash = off;
explain (costs off)
select count(*) from simple r join extremely_skewed s using (id);
select count(*) from simple r join extremely_skewed s using (id);
select * from hash_join_batches(
$$
select count(*) from simple r join extremely_skewed s using (id);
$$);
rollback to settings;
-- parallel with parallel-aware hash join
savepoint settings;
set local max_parallel_workers_per_gather = 1;
set local work_mem = '128kB';
set local enable_parallel_hash = on;
explain (costs off)
select count(*) from simple r join extremely_skewed s using (id);
select count(*) from simple r join extremely_skewed s using (id);
......@@ -2175,11 +2242,12 @@ rollback to settings;
create table foo as select generate_series(1, 3) as id, 'xxxxx'::text as t;
alter table foo set (parallel_workers = 0);
create table bar as select generate_series(1, 5000) as id, 'xxxxx'::text as t;
create table bar as select generate_series(1, 10000) as id, 'xxxxx'::text as t;
alter table bar set (parallel_workers = 2);
-- multi-batch with rescan, parallel-oblivious
savepoint settings;
set enable_parallel_hash = off;
set parallel_leader_participation = off;
set min_parallel_table_scan_size = 0;
set parallel_setup_cost = 0;
......@@ -2206,6 +2274,61 @@ rollback to settings;
-- single-batch with rescan, parallel-oblivious
savepoint settings;
set enable_parallel_hash = off;
set parallel_leader_participation = off;
set min_parallel_table_scan_size = 0;
set parallel_setup_cost = 0;
set parallel_tuple_cost = 0;
set max_parallel_workers_per_gather = 2;
set enable_material = off;
set enable_mergejoin = off;
set work_mem = '4MB';
explain (costs off)
select count(*) from foo
left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
on foo.id < ss.id + 1 and foo.id > ss.id - 1;
select count(*) from foo
left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
on foo.id < ss.id + 1 and foo.id > ss.id - 1;
select final > 1 as multibatch
from hash_join_batches(
$$
select count(*) from foo
left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
on foo.id < ss.id + 1 and foo.id > ss.id - 1;
$$);
rollback to settings;
-- multi-batch with rescan, parallel-aware
savepoint settings;
set enable_parallel_hash = on;
set parallel_leader_participation = off;
set min_parallel_table_scan_size = 0;
set parallel_setup_cost = 0;
set parallel_tuple_cost = 0;
set max_parallel_workers_per_gather = 2;
set enable_material = off;
set enable_mergejoin = off;
set work_mem = '64kB';
explain (costs off)
select count(*) from foo
left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
on foo.id < ss.id + 1 and foo.id > ss.id - 1;
select count(*) from foo
left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
on foo.id < ss.id + 1 and foo.id > ss.id - 1;
select final > 1 as multibatch
from hash_join_batches(
$$
select count(*) from foo
left join (select b1.id, b1.t from bar b1 join bar b2 using (id)) ss
on foo.id < ss.id + 1 and foo.id > ss.id - 1;
$$);
rollback to settings;
-- single-batch with rescan, parallel-aware
savepoint settings;
set enable_parallel_hash = on;
set parallel_leader_participation = off;
set min_parallel_table_scan_size = 0;
set parallel_setup_cost = 0;
......@@ -2266,4 +2389,27 @@ explain (costs off)
select count(*) from simple r full outer join simple s on (r.id = 0 - s.id);
rollback to settings;
-- exercise special code paths for huge tuples (note use of non-strict
-- expression and left join required to get the detoasted tuple into
-- the hash table)
-- parallel with parallel-aware hash join (hits ExecParallelHashLoadTuple and
-- sts_puttuple oversized tuple cases because it's multi-batch)
savepoint settings;
set max_parallel_workers_per_gather = 2;
set enable_parallel_hash = on;
set work_mem = '128kB';
explain (costs off)
select length(max(s.t))
from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
select length(max(s.t))
from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
select final > 1 as multibatch
from hash_join_batches(
$$
select length(max(s.t))
from wide left join (select id, coalesce(t, '') || '' as t from wide) s using (id);
$$);
rollback to settings;
rollback;
......@@ -1542,6 +1542,10 @@ ParallelBitmapHeapState
ParallelCompletionPtr
ParallelContext
ParallelExecutorInfo
ParallelHashGrowth
ParallelHashJoinBatch
ParallelHashJoinBatchAccessor
ParallelHashJoinState
ParallelHeapScanDesc
ParallelIndexScanDesc
ParallelSlot
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment