Commit 5bcf389e authored by Andres Freund's avatar Andres Freund

Fix EXPLAIN ANALYZE of hash join when the leader doesn't participate.

If a hash join appears in a parallel query, there may be no hash table
available for explain.c to inspect even though a hash table may have
been built in other processes.  This could happen either because
parallel_leader_participation was set to off or because the leader
happened to hit the end of the outer relation immediately (even though
the complete relation is not empty) and decided not to build the hash
table.

Commit bf11e7ee introduced a way for workers to exchange
instrumentation via the DSM segment for Sort nodes even though they
are not parallel-aware.  This commit does the same for Hash nodes, so
that explain.c has a way to find instrumentation data from an
arbitrary participant that actually built the hash table.

Author: Thomas Munro
Reviewed-By: Andres Freund
Discussion: https://postgr.es/m/CAEepm%3D3DUQC2-z252N55eOcZBer6DPdM%3DFzrxH9dZc5vYLsjaA%40mail.gmail.com
parent 82c5c533
......@@ -19,7 +19,7 @@
#include "commands/createas.h"
#include "commands/defrem.h"
#include "commands/prepare.h"
#include "executor/hashjoin.h"
#include "executor/nodeHash.h"
#include "foreign/fdwapi.h"
#include "nodes/extensible.h"
#include "nodes/nodeFuncs.h"
......@@ -2379,34 +2379,62 @@ show_sort_info(SortState *sortstate, ExplainState *es)
static void
show_hash_info(HashState *hashstate, ExplainState *es)
{
HashJoinTable hashtable;
HashInstrumentation *hinstrument = NULL;
hashtable = hashstate->hashtable;
/*
* In a parallel query, the leader process may or may not have run the
* hash join, and even if it did it may not have built a hash table due to
* timing (if it started late it might have seen no tuples in the outer
* relation and skipped building the hash table). Therefore we have to be
* prepared to get instrumentation data from a worker if there is no hash
* table.
*/
if (hashstate->hashtable)
{
hinstrument = (HashInstrumentation *)
palloc(sizeof(HashInstrumentation));
ExecHashGetInstrumentation(hinstrument, hashstate->hashtable);
}
else if (hashstate->shared_info)
{
SharedHashInfo *shared_info = hashstate->shared_info;
int i;
/* Find the first worker that built a hash table. */
for (i = 0; i < shared_info->num_workers; ++i)
{
if (shared_info->hinstrument[i].nbatch > 0)
{
hinstrument = &shared_info->hinstrument[i];
break;
}
}
}
if (hashtable)
if (hinstrument)
{
long spacePeakKb = (hashtable->spacePeak + 1023) / 1024;
long spacePeakKb = (hinstrument->space_peak + 1023) / 1024;
if (es->format != EXPLAIN_FORMAT_TEXT)
{
ExplainPropertyLong("Hash Buckets", hashtable->nbuckets, es);
ExplainPropertyLong("Hash Buckets", hinstrument->nbuckets, es);
ExplainPropertyLong("Original Hash Buckets",
hashtable->nbuckets_original, es);
ExplainPropertyLong("Hash Batches", hashtable->nbatch, es);
hinstrument->nbuckets_original, es);
ExplainPropertyLong("Hash Batches", hinstrument->nbatch, es);
ExplainPropertyLong("Original Hash Batches",
hashtable->nbatch_original, es);
hinstrument->nbatch_original, es);
ExplainPropertyLong("Peak Memory Usage", spacePeakKb, es);
}
else if (hashtable->nbatch_original != hashtable->nbatch ||
hashtable->nbuckets_original != hashtable->nbuckets)
else if (hinstrument->nbatch_original != hinstrument->nbatch ||
hinstrument->nbuckets_original != hinstrument->nbuckets)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str,
"Buckets: %d (originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
hashtable->nbuckets,
hashtable->nbuckets_original,
hashtable->nbatch,
hashtable->nbatch_original,
hinstrument->nbuckets,
hinstrument->nbuckets_original,
hinstrument->nbatch,
hinstrument->nbatch_original,
spacePeakKb);
}
else
......@@ -2414,7 +2442,7 @@ show_hash_info(HashState *hashstate, ExplainState *es)
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str,
"Buckets: %d Batches: %d Memory Usage: %ldkB\n",
hashtable->nbuckets, hashtable->nbatch,
hinstrument->nbuckets, hinstrument->nbatch,
spacePeakKb);
}
}
......
......@@ -29,6 +29,7 @@
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeHash.h"
#include "executor/nodeIndexscan.h"
#include "executor/nodeIndexonlyscan.h"
#include "executor/nodeSeqscan.h"
......@@ -259,8 +260,12 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
e->pcxt);
break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashEstimate((HashState *) planstate, e->pcxt);
break;
case T_SortState:
/* even when not parallel-aware */
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortEstimate((SortState *) planstate, e->pcxt);
break;
......@@ -458,8 +463,12 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
d->pcxt);
break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
break;
case T_SortState:
/* even when not parallel-aware */
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
break;
......@@ -872,8 +881,12 @@ ExecParallelReInitializeDSM(PlanState *planstate,
ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
pcxt);
break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashReInitializeDSM((HashState *) planstate, pcxt);
break;
case T_SortState:
/* even when not parallel-aware */
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortReInitializeDSM((SortState *) planstate, pcxt);
break;
......@@ -928,12 +941,18 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
planstate->worker_instrument->num_workers = instrumentation->num_workers;
memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
/*
* Perform any node-type-specific work that needs to be done. Currently,
* only Sort nodes need to do anything here.
*/
if (IsA(planstate, SortState))
ExecSortRetrieveInstrumentation((SortState *) planstate);
/* Perform any node-type-specific work that needs to be done. */
switch (nodeTag(planstate))
{
case T_SortState:
ExecSortRetrieveInstrumentation((SortState *) planstate);
break;
case T_HashState:
ExecHashRetrieveInstrumentation((HashState *) planstate);
break;
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
instrumentation);
......@@ -1160,8 +1179,12 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
pwcxt);
break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashInitializeWorker((HashState *) planstate, pwcxt);
break;
case T_SortState:
/* even when not parallel-aware */
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortInitializeWorker((SortState *) planstate, pwcxt);
break;
......
......@@ -751,6 +751,9 @@ ExecShutdownNode(PlanState *node)
case T_GatherMergeState:
ExecShutdownGatherMerge((GatherMergeState *) node);
break;
case T_HashState:
ExecShutdownHash((HashState *) node);
break;
default:
break;
}
......
......@@ -1637,6 +1637,110 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
}
}
/*
* Reserve space in the DSM segment for instrumentation data.
*/
void
ExecHashEstimate(HashState *node, ParallelContext *pcxt)
{
size_t size;
size = mul_size(pcxt->nworkers, sizeof(HashInstrumentation));
size = add_size(size, offsetof(SharedHashInfo, hinstrument));
shm_toc_estimate_chunk(&pcxt->estimator, size);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
/*
* Set up a space in the DSM for all workers to record instrumentation data
* about their hash table.
*/
void
ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt)
{
size_t size;
size = offsetof(SharedHashInfo, hinstrument) +
pcxt->nworkers * sizeof(HashInstrumentation);
node->shared_info = (SharedHashInfo *) shm_toc_allocate(pcxt->toc, size);
memset(node->shared_info, 0, size);
node->shared_info->num_workers = pcxt->nworkers;
shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id,
node->shared_info);
}
/*
* Reset shared state before beginning a fresh scan.
*/
void
ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt)
{
if (node->shared_info != NULL)
{
memset(node->shared_info->hinstrument, 0,
node->shared_info->num_workers * sizeof(HashInstrumentation));
}
}
/*
* Locate the DSM space for hash table instrumentation data that we'll write
* to at shutdown time.
*/
void
ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt)
{
SharedHashInfo *shared_info;
shared_info = (SharedHashInfo *)
shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, true);
node->hinstrument = &shared_info->hinstrument[ParallelWorkerNumber];
}
/*
* Copy instrumentation data from this worker's hash table (if it built one)
* to DSM memory so the leader can retrieve it. This must be done in an
* ExecShutdownHash() rather than ExecEndHash() because the latter runs after
* we've detached from the DSM segment.
*/
void
ExecShutdownHash(HashState *node)
{
if (node->hinstrument && node->hashtable)
ExecHashGetInstrumentation(node->hinstrument, node->hashtable);
}
/*
* Retrieve instrumentation data from workers before the DSM segment is
* detached, so that EXPLAIN can access it.
*/
void
ExecHashRetrieveInstrumentation(HashState *node)
{
SharedHashInfo *shared_info = node->shared_info;
size_t size;
/* Replace node->shared_info with a copy in backend-local memory. */
size = offsetof(SharedHashInfo, hinstrument) +
shared_info->num_workers * sizeof(HashInstrumentation);
node->shared_info = palloc(size);
memcpy(node->shared_info, shared_info, size);
}
/*
* Copy the instrumentation data from 'hashtable' into a HashInstrumentation
* struct.
*/
void
ExecHashGetInstrumentation(HashInstrumentation *instrument,
HashJoinTable hashtable)
{
instrument->nbuckets = hashtable->nbuckets;
instrument->nbuckets_original = hashtable->nbuckets_original;
instrument->nbatch = hashtable->nbatch;
instrument->nbatch_original = hashtable->nbatch_original;
instrument->space_peak = hashtable->spacePeak;
}
/*
* Allocate 'size' bytes from the currently active HashMemoryChunk
*/
......
......@@ -14,6 +14,7 @@
#ifndef NODEHASH_H
#define NODEHASH_H
#include "access/parallel.h"
#include "nodes/execnodes.h"
extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
......@@ -48,5 +49,13 @@ extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
int *numbatches,
int *num_skew_mcvs);
extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue);
extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
extern void ExecHashInitializeDSM(HashState *node, ParallelContext *pcxt);
extern void ExecHashInitializeWorker(HashState *node, ParallelWorkerContext *pwcxt);
extern void ExecHashReInitializeDSM(HashState *node, ParallelContext *pcxt);
extern void ExecHashRetrieveInstrumentation(HashState *node);
extern void ExecShutdownHash(HashState *node);
extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
HashJoinTable hashtable);
#endif /* NODEHASH_H */
......@@ -1980,6 +1980,29 @@ typedef struct GatherMergeState
struct binaryheap *gm_heap; /* binary heap of slot indices */
} GatherMergeState;
/* ----------------
* Values displayed by EXPLAIN ANALYZE
* ----------------
*/
typedef struct HashInstrumentation
{
int nbuckets; /* number of buckets at end of execution */
int nbuckets_original; /* planned number of buckets */
int nbatch; /* number of batches at end of execution */
int nbatch_original; /* planned number of batches */
size_t space_peak; /* speak memory usage in bytes */
} HashInstrumentation;
/* ----------------
* Shared memory container for per-worker hash information
* ----------------
*/
typedef struct SharedHashInfo
{
int num_workers;
HashInstrumentation hinstrument[FLEXIBLE_ARRAY_MEMBER];
} SharedHashInfo;
/* ----------------
* HashState information
* ----------------
......@@ -1990,6 +2013,9 @@ typedef struct HashState
HashJoinTable hashtable; /* hash table for the hashjoin */
List *hashkeys; /* list of ExprState nodes */
/* hashkeys is same as parent's hj_InnerHashKeys */
SharedHashInfo *shared_info; /* one entry per worker */
HashInstrumentation *hinstrument; /* this worker's entry */
} HashState;
/* ----------------
......
......@@ -6173,6 +6173,21 @@ $$);
rollback to settings;
-- A couple of other hash join tests unrelated to work_mem management.
-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '4MB';
set local parallel_leader_participation = off;
select * from hash_join_batches(
$$
select count(*) from simple r join simple s using (id);
$$);
original | final
----------+-------
1 | 1
(1 row)
rollback to settings;
-- A full outer join where every record is matched.
-- non-parallel
savepoint settings;
......
......@@ -2159,6 +2159,17 @@ rollback to settings;
-- A couple of other hash join tests unrelated to work_mem management.
-- Check that EXPLAIN ANALYZE has data even if the leader doesn't participate
savepoint settings;
set local max_parallel_workers_per_gather = 2;
set local work_mem = '4MB';
set local parallel_leader_participation = off;
select * from hash_join_batches(
$$
select count(*) from simple r join simple s using (id);
$$);
rollback to settings;
-- A full outer join where every record is matched.
-- non-parallel
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment