Commit 7f7f25f1 authored by Thomas Munro's avatar Thomas Munro

Revert "Fix race in Parallel Hash Join batch cleanup."

This reverts commit 378802e3.
This reverts commit 3b8981b6.

Discussion: https://postgr.es/m/CA%2BhUKGJmcqAE3MZeDCLLXa62cWM0AJbKmp2JrJYaJ86bz36LFA%40mail.gmail.com
parent 9fd2952c
...@@ -246,10 +246,10 @@ MultiExecParallelHash(HashState *node) ...@@ -246,10 +246,10 @@ MultiExecParallelHash(HashState *node)
*/ */
pstate = hashtable->parallel_state; pstate = hashtable->parallel_state;
build_barrier = &pstate->build_barrier; build_barrier = &pstate->build_barrier;
Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATE); Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING);
switch (BarrierPhase(build_barrier)) switch (BarrierPhase(build_barrier))
{ {
case PHJ_BUILD_ALLOCATE: case PHJ_BUILD_ALLOCATING:
/* /*
* Either I just allocated the initial hash table in * Either I just allocated the initial hash table in
...@@ -259,7 +259,7 @@ MultiExecParallelHash(HashState *node) ...@@ -259,7 +259,7 @@ MultiExecParallelHash(HashState *node)
BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATE); BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATE);
/* Fall through. */ /* Fall through. */
case PHJ_BUILD_HASH_INNER: case PHJ_BUILD_HASHING_INNER:
/* /*
* It's time to begin hashing, or if we just arrived here then * It's time to begin hashing, or if we just arrived here then
...@@ -271,10 +271,10 @@ MultiExecParallelHash(HashState *node) ...@@ -271,10 +271,10 @@ MultiExecParallelHash(HashState *node)
* below. * below.
*/ */
if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) != if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
PHJ_GROW_BATCHES_ELECT) PHJ_GROW_BATCHES_ELECTING)
ExecParallelHashIncreaseNumBatches(hashtable); ExecParallelHashIncreaseNumBatches(hashtable);
if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) != if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
PHJ_GROW_BUCKETS_ELECT) PHJ_GROW_BUCKETS_ELECTING)
ExecParallelHashIncreaseNumBuckets(hashtable); ExecParallelHashIncreaseNumBuckets(hashtable);
ExecParallelHashEnsureBatchAccessors(hashtable); ExecParallelHashEnsureBatchAccessors(hashtable);
ExecParallelHashTableSetCurrentBatch(hashtable, 0); ExecParallelHashTableSetCurrentBatch(hashtable, 0);
...@@ -333,22 +333,15 @@ MultiExecParallelHash(HashState *node) ...@@ -333,22 +333,15 @@ MultiExecParallelHash(HashState *node)
hashtable->nbuckets = pstate->nbuckets; hashtable->nbuckets = pstate->nbuckets;
hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
hashtable->totalTuples = pstate->total_tuples; hashtable->totalTuples = pstate->total_tuples;
ExecParallelHashEnsureBatchAccessors(hashtable);
/*
* Unless we're completely done and the batch state has been freed, make
* sure we have accessors.
*/
if (BarrierPhase(build_barrier) < PHJ_BUILD_FREE)
ExecParallelHashEnsureBatchAccessors(hashtable);
/* /*
* The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
* case, which will bring the build phase to PHJ_BUILD_RUN (if it isn't * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
* there already). * there already).
*/ */
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER || Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_RUN || BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
BarrierPhase(build_barrier) == PHJ_BUILD_FREE);
} }
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
...@@ -596,8 +589,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, ...@@ -596,8 +589,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
* Attach to the build barrier. The corresponding detach operation is * Attach to the build barrier. The corresponding detach operation is
* in ExecHashTableDetach. Note that we won't attach to the * in ExecHashTableDetach. Note that we won't attach to the
* batch_barrier for batch 0 yet. We'll attach later and start it out * batch_barrier for batch 0 yet. We'll attach later and start it out
* in PHJ_BATCH_PROBE phase, because batch 0 is allocated up front and * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
* then loaded while hashing (the standard hybrid hash join * and then loaded while hashing (the standard hybrid hash join
* algorithm), and we'll coordinate that using build_barrier. * algorithm), and we'll coordinate that using build_barrier.
*/ */
build_barrier = &pstate->build_barrier; build_barrier = &pstate->build_barrier;
...@@ -610,7 +603,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, ...@@ -610,7 +603,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
* SharedHashJoinBatch objects and the hash table for batch 0. One * SharedHashJoinBatch objects and the hash table for batch 0. One
* backend will be elected to do that now if necessary. * backend will be elected to do that now if necessary.
*/ */
if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECT && if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT)) BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECT))
{ {
pstate->nbatch = nbatch; pstate->nbatch = nbatch;
...@@ -631,7 +624,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, ...@@ -631,7 +624,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
/* /*
* The next Parallel Hash synchronization point is in * The next Parallel Hash synchronization point is in
* MultiExecParallelHash(), which will progress it all the way to * MultiExecParallelHash(), which will progress it all the way to
* PHJ_BUILD_RUN. The caller must not return control from this * PHJ_BUILD_DONE. The caller must not return control from this
* executor node between now and then. * executor node between now and then.
*/ */
} }
...@@ -1067,7 +1060,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) ...@@ -1067,7 +1060,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
ParallelHashJoinState *pstate = hashtable->parallel_state; ParallelHashJoinState *pstate = hashtable->parallel_state;
int i; int i;
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER); Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
/* /*
* It's unlikely, but we need to be prepared for new participants to show * It's unlikely, but we need to be prepared for new participants to show
...@@ -1076,7 +1069,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) ...@@ -1076,7 +1069,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
*/ */
switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier))) switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier)))
{ {
case PHJ_GROW_BATCHES_ELECT: case PHJ_GROW_BATCHES_ELECTING:
/* /*
* Elect one participant to prepare to grow the number of batches. * Elect one participant to prepare to grow the number of batches.
...@@ -1194,13 +1187,13 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) ...@@ -1194,13 +1187,13 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
} }
/* Fall through. */ /* Fall through. */
case PHJ_GROW_BATCHES_REALLOCATE: case PHJ_GROW_BATCHES_ALLOCATING:
/* Wait for the above to be finished. */ /* Wait for the above to be finished. */
BarrierArriveAndWait(&pstate->grow_batches_barrier, BarrierArriveAndWait(&pstate->grow_batches_barrier,
WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE); WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE);
/* Fall through. */ /* Fall through. */
case PHJ_GROW_BATCHES_REPARTITION: case PHJ_GROW_BATCHES_REPARTITIONING:
/* Make sure that we have the current dimensions and buckets. */ /* Make sure that we have the current dimensions and buckets. */
ExecParallelHashEnsureBatchAccessors(hashtable); ExecParallelHashEnsureBatchAccessors(hashtable);
ExecParallelHashTableSetCurrentBatch(hashtable, 0); ExecParallelHashTableSetCurrentBatch(hashtable, 0);
...@@ -1213,7 +1206,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) ...@@ -1213,7 +1206,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION); WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION);
/* Fall through. */ /* Fall through. */
case PHJ_GROW_BATCHES_DECIDE: case PHJ_GROW_BATCHES_DECIDING:
/* /*
* Elect one participant to clean up and decide whether further * Elect one participant to clean up and decide whether further
...@@ -1268,7 +1261,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) ...@@ -1268,7 +1261,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
} }
/* Fall through. */ /* Fall through. */
case PHJ_GROW_BATCHES_FINISH: case PHJ_GROW_BATCHES_FINISHING:
/* Wait for the above to complete. */ /* Wait for the above to complete. */
BarrierArriveAndWait(&pstate->grow_batches_barrier, BarrierArriveAndWait(&pstate->grow_batches_barrier,
WAIT_EVENT_HASH_GROW_BATCHES_FINISH); WAIT_EVENT_HASH_GROW_BATCHES_FINISH);
...@@ -1508,7 +1501,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) ...@@ -1508,7 +1501,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
HashMemoryChunk chunk; HashMemoryChunk chunk;
dsa_pointer chunk_s; dsa_pointer chunk_s;
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER); Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
/* /*
* It's unlikely, but we need to be prepared for new participants to show * It's unlikely, but we need to be prepared for new participants to show
...@@ -1517,7 +1510,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) ...@@ -1517,7 +1510,7 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
*/ */
switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier))) switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier)))
{ {
case PHJ_GROW_BUCKETS_ELECT: case PHJ_GROW_BUCKETS_ELECTING:
/* Elect one participant to prepare to increase nbuckets. */ /* Elect one participant to prepare to increase nbuckets. */
if (BarrierArriveAndWait(&pstate->grow_buckets_barrier, if (BarrierArriveAndWait(&pstate->grow_buckets_barrier,
WAIT_EVENT_HASH_GROW_BUCKETS_ELECT)) WAIT_EVENT_HASH_GROW_BUCKETS_ELECT))
...@@ -1546,13 +1539,13 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) ...@@ -1546,13 +1539,13 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
} }
/* Fall through. */ /* Fall through. */
case PHJ_GROW_BUCKETS_REALLOCATE: case PHJ_GROW_BUCKETS_ALLOCATING:
/* Wait for the above to complete. */ /* Wait for the above to complete. */
BarrierArriveAndWait(&pstate->grow_buckets_barrier, BarrierArriveAndWait(&pstate->grow_buckets_barrier,
WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE); WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE);
/* Fall through. */ /* Fall through. */
case PHJ_GROW_BUCKETS_REINSERT: case PHJ_GROW_BUCKETS_REINSERTING:
/* Reinsert all tuples into the hash table. */ /* Reinsert all tuples into the hash table. */
ExecParallelHashEnsureBatchAccessors(hashtable); ExecParallelHashEnsureBatchAccessors(hashtable);
ExecParallelHashTableSetCurrentBatch(hashtable, 0); ExecParallelHashTableSetCurrentBatch(hashtable, 0);
...@@ -1708,7 +1701,7 @@ retry: ...@@ -1708,7 +1701,7 @@ retry:
/* Try to load it into memory. */ /* Try to load it into memory. */
Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) == Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) ==
PHJ_BUILD_HASH_INNER); PHJ_BUILD_HASHING_INNER);
hashTuple = ExecParallelHashTupleAlloc(hashtable, hashTuple = ExecParallelHashTupleAlloc(hashtable,
HJTUPLE_OVERHEAD + tuple->t_len, HJTUPLE_OVERHEAD + tuple->t_len,
&shared); &shared);
...@@ -2862,7 +2855,7 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size, ...@@ -2862,7 +2855,7 @@ ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
if (pstate->growth != PHJ_GROWTH_DISABLED) if (pstate->growth != PHJ_GROWTH_DISABLED)
{ {
Assert(curbatch == 0); Assert(curbatch == 0);
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASH_INNER); Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
/* /*
* Check if our space limit would be exceeded. To avoid choking on * Check if our space limit would be exceeded. To avoid choking on
...@@ -2982,7 +2975,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) ...@@ -2982,7 +2975,7 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
{ {
/* Batch 0 doesn't need to be loaded. */ /* Batch 0 doesn't need to be loaded. */
BarrierAttach(&shared->batch_barrier); BarrierAttach(&shared->batch_barrier);
while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBE) while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING)
BarrierArriveAndWait(&shared->batch_barrier, 0); BarrierArriveAndWait(&shared->batch_barrier, 0);
BarrierDetach(&shared->batch_barrier); BarrierDetach(&shared->batch_barrier);
} }
...@@ -3055,11 +3048,14 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) ...@@ -3055,11 +3048,14 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
} }
/* /*
* We should never see a state where the batch-tracking array is freed, * It's possible for a backend to start up very late so that the whole
* because we should have given up sooner if we join when the build * join is finished and the shm state for tracking batches has already
* barrier has reached the PHJ_BUILD_FREE phase. * been freed by ExecHashTableDetach(). In that case we'll just leave
* hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
* up early.
*/ */
Assert(DsaPointerIsValid(pstate->batches)); if (!DsaPointerIsValid(pstate->batches))
return;
/* Use hash join memory context. */ /* Use hash join memory context. */
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
...@@ -3140,7 +3136,7 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) ...@@ -3140,7 +3136,7 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
* longer attached, but since there is no way it's moving after * longer attached, but since there is no way it's moving after
* this point it seems safe to make the following assertion. * this point it seems safe to make the following assertion.
*/ */
Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_FREE); Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
/* Free shared chunks and buckets. */ /* Free shared chunks and buckets. */
while (DsaPointerIsValid(batch->chunks)) while (DsaPointerIsValid(batch->chunks))
...@@ -3179,17 +3175,9 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) ...@@ -3179,17 +3175,9 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
void void
ExecHashTableDetach(HashJoinTable hashtable) ExecHashTableDetach(HashJoinTable hashtable)
{ {
ParallelHashJoinState *pstate = hashtable->parallel_state; if (hashtable->parallel_state)
/*
* If we're involved in a parallel query, we must either have got all the
* way to PHJ_BUILD_RUN, or joined too late and be in PHJ_BUILD_FREE.
*/
Assert(!pstate ||
BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUN);
if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUN)
{ {
ParallelHashJoinState *pstate = hashtable->parallel_state;
int i; int i;
/* Make sure any temporary files are closed. */ /* Make sure any temporary files are closed. */
...@@ -3205,22 +3193,17 @@ ExecHashTableDetach(HashJoinTable hashtable) ...@@ -3205,22 +3193,17 @@ ExecHashTableDetach(HashJoinTable hashtable)
} }
/* If we're last to detach, clean up shared memory. */ /* If we're last to detach, clean up shared memory. */
if (BarrierArriveAndDetach(&pstate->build_barrier)) if (BarrierDetach(&pstate->build_barrier))
{ {
/*
* Late joining processes will see this state and give up
* immediately.
*/
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_FREE);
if (DsaPointerIsValid(pstate->batches)) if (DsaPointerIsValid(pstate->batches))
{ {
dsa_free(hashtable->area, pstate->batches); dsa_free(hashtable->area, pstate->batches);
pstate->batches = InvalidDsaPointer; pstate->batches = InvalidDsaPointer;
} }
} }
hashtable->parallel_state = NULL;
} }
hashtable->parallel_state = NULL;
} }
/* /*
......
...@@ -39,30 +39,26 @@ ...@@ -39,30 +39,26 @@
* *
* One barrier called build_barrier is used to coordinate the hashing phases. * One barrier called build_barrier is used to coordinate the hashing phases.
* The phase is represented by an integer which begins at zero and increments * The phase is represented by an integer which begins at zero and increments
* one by one, but in the code it is referred to by symbolic names as follows. * one by one, but in the code it is referred to by symbolic names as follows:
* An asterisk indicates a phase that is performed by a single arbitrarily
* chosen process.
* *
* PHJ_BUILD_ELECT -- initial state * PHJ_BUILD_ELECTING -- initial state
* PHJ_BUILD_ALLOCATE* -- one sets up the batches and table 0 * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
* PHJ_BUILD_HASH_INNER -- all hash the inner rel * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
* PHJ_BUILD_HASH_OUTER -- (multi-batch only) all hash the outer * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
* PHJ_BUILD_RUN -- building done, probing can begin * PHJ_BUILD_DONE -- building done, probing can begin
* PHJ_BUILD_FREE* -- all work complete, one frees batches
* *
* While in the phase PHJ_BUILD_HASH_INNER a separate pair of barriers may * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
* be used repeatedly as required to coordinate expansions in the number of * be used repeatedly as required to coordinate expansions in the number of
* batches or buckets. Their phases are as follows: * batches or buckets. Their phases are as follows:
* *
* PHJ_GROW_BATCHES_ELECT -- initial state * PHJ_GROW_BATCHES_ELECTING -- initial state
* PHJ_GROW_BATCHES_REALLOCATE* -- one allocates new batches * PHJ_GROW_BATCHES_ALLOCATING -- one allocates new batches
* PHJ_GROW_BATCHES_REPARTITION -- all repartition * PHJ_GROW_BATCHES_REPARTITIONING -- all repartition
* PHJ_GROW_BATCHES_DECIDE* -- one detects skew and cleans up * PHJ_GROW_BATCHES_FINISHING -- one cleans up, detects skew
* PHJ_GROW_BATCHES_FINISH -- finished one growth cycle
* *
* PHJ_GROW_BUCKETS_ELECT -- initial state * PHJ_GROW_BUCKETS_ELECTING -- initial state
* PHJ_GROW_BUCKETS_REALLOCATE* -- one allocates new buckets * PHJ_GROW_BUCKETS_ALLOCATING -- one allocates new buckets
* PHJ_GROW_BUCKETS_REINSERT -- all insert tuples * PHJ_GROW_BUCKETS_REINSERTING -- all insert tuples
* *
* If the planner got the number of batches and buckets right, those won't be * If the planner got the number of batches and buckets right, those won't be
* necessary, but on the other hand we might finish up needing to expand the * necessary, but on the other hand we might finish up needing to expand the
...@@ -70,27 +66,27 @@ ...@@ -70,27 +66,27 @@
* within our memory budget and load factor target. For that reason it's a * within our memory budget and load factor target. For that reason it's a
* separate pair of barriers using circular phases. * separate pair of barriers using circular phases.
* *
* The PHJ_BUILD_HASH_OUTER phase is required only for multi-batch joins, * The PHJ_BUILD_HASHING_OUTER phase is required only for multi-batch joins,
* because we need to divide the outer relation into batches up front in order * because we need to divide the outer relation into batches up front in order
* to be able to process batches entirely independently. In contrast, the * to be able to process batches entirely independently. In contrast, the
* parallel-oblivious algorithm simply throws tuples 'forward' to 'later' * parallel-oblivious algorithm simply throws tuples 'forward' to 'later'
* batches whenever it encounters them while scanning and probing, which it * batches whenever it encounters them while scanning and probing, which it
* can do because it processes batches in serial order. * can do because it processes batches in serial order.
* *
* Once PHJ_BUILD_RUN is reached, backends then split up and process * Once PHJ_BUILD_DONE is reached, backends then split up and process
* different batches, or gang up and work together on probing batches if there * different batches, or gang up and work together on probing batches if there
* aren't enough to go around. For each batch there is a separate barrier * aren't enough to go around. For each batch there is a separate barrier
* with the following phases: * with the following phases:
* *
* PHJ_BATCH_ELECT -- initial state * PHJ_BATCH_ELECTING -- initial state
* PHJ_BATCH_ALLOCATE* -- one allocates buckets * PHJ_BATCH_ALLOCATING -- one allocates buckets
* PHJ_BATCH_LOAD -- all load the hash table from disk * PHJ_BATCH_LOADING -- all load the hash table from disk
* PHJ_BATCH_PROBE -- all probe * PHJ_BATCH_PROBING -- all probe
* PHJ_BATCH_FREE* -- one frees memory * PHJ_BATCH_DONE -- end
* *
* Batch 0 is a special case, because it starts out in phase * Batch 0 is a special case, because it starts out in phase
* PHJ_BATCH_PROBE; populating batch 0's hash table is done during * PHJ_BATCH_PROBING; populating batch 0's hash table is done during
* PHJ_BUILD_HASH_INNER so we can skip loading. * PHJ_BUILD_HASHING_INNER so we can skip loading.
* *
* Initially we try to plan for a single-batch hash join using the combined * Initially we try to plan for a single-batch hash join using the combined
* hash_mem of all participants to create a large shared hash table. If that * hash_mem of all participants to create a large shared hash table. If that
...@@ -99,16 +95,11 @@ ...@@ -99,16 +95,11 @@
* *
* To avoid deadlocks, we never wait for any barrier unless it is known that * To avoid deadlocks, we never wait for any barrier unless it is known that
* all other backends attached to it are actively executing the node or have * all other backends attached to it are actively executing the node or have
* finished. Practically, that means that we never emit a tuple while attached * already arrived. Practically, that means that we never return a tuple
* to a barrier, unless the barrier has reached a phase that means that no * while attached to a barrier, unless the barrier has reached its final
* process will wait on it again. We emit tuples while attached to the build * state. In the slightly special case of the per-batch barrier, we return
* barrier in phase PHJ_BUILD_RUN, and to a per-batch barrier in phase * tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
* PHJ_BATCH_PROBE. These are advanced to PHJ_BUILD_FREE and PHJ_BATCH_FREE * BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
* respectively without waiting, using BarrierArriveAndDetach(). The last to
* detach receives a different return value so that it knows that it's safe to
* clean up. Any straggler process that attaches after that phase is reached
* will see that it's too late to participate or access the relevant shared
* memory objects.
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -325,10 +316,9 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) ...@@ -325,10 +316,9 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
Barrier *build_barrier; Barrier *build_barrier;
build_barrier = &parallel_state->build_barrier; build_barrier = &parallel_state->build_barrier;
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER || Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_RUN || BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
BarrierPhase(build_barrier) == PHJ_BUILD_FREE); if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
if (BarrierPhase(build_barrier) == PHJ_BUILD_HASH_OUTER)
{ {
/* /*
* If multi-batch, we need to hash the outer relation * If multi-batch, we need to hash the outer relation
...@@ -339,18 +329,9 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) ...@@ -339,18 +329,9 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
BarrierArriveAndWait(build_barrier, BarrierArriveAndWait(build_barrier,
WAIT_EVENT_HASH_BUILD_HASH_OUTER); WAIT_EVENT_HASH_BUILD_HASH_OUTER);
} }
else if (BarrierPhase(build_barrier) == PHJ_BUILD_FREE) Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
{
/*
* If we attached so late that the job is finished and
* the batch state has been freed, we can return
* immediately.
*/
return NULL;
}
/* Each backend should now select a batch to work on. */ /* Each backend should now select a batch to work on. */
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUN);
hashtable->curbatch = -1; hashtable->curbatch = -1;
node->hj_JoinState = HJ_NEED_NEW_BATCH; node->hj_JoinState = HJ_NEED_NEW_BATCH;
...@@ -1109,6 +1090,14 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) ...@@ -1109,6 +1090,14 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
int start_batchno; int start_batchno;
int batchno; int batchno;
/*
* If we started up so late that the batch tracking array has been freed
* already by ExecHashTableDetach(), then we are finished. See also
* ExecParallelHashEnsureBatchAccessors().
*/
if (hashtable->batches == NULL)
return false;
/* /*
* If we were already attached to a batch, remember not to bother checking * If we were already attached to a batch, remember not to bother checking
* it again, and detach from it (possibly freeing the hash table if we are * it again, and detach from it (possibly freeing the hash table if we are
...@@ -1142,7 +1131,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) ...@@ -1142,7 +1131,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
switch (BarrierAttach(batch_barrier)) switch (BarrierAttach(batch_barrier))
{ {
case PHJ_BATCH_ELECT: case PHJ_BATCH_ELECTING:
/* One backend allocates the hash table. */ /* One backend allocates the hash table. */
if (BarrierArriveAndWait(batch_barrier, if (BarrierArriveAndWait(batch_barrier,
...@@ -1150,13 +1139,13 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) ...@@ -1150,13 +1139,13 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
ExecParallelHashTableAlloc(hashtable, batchno); ExecParallelHashTableAlloc(hashtable, batchno);
/* Fall through. */ /* Fall through. */
case PHJ_BATCH_ALLOCATE: case PHJ_BATCH_ALLOCATING:
/* Wait for allocation to complete. */ /* Wait for allocation to complete. */
BarrierArriveAndWait(batch_barrier, BarrierArriveAndWait(batch_barrier,
WAIT_EVENT_HASH_BATCH_ALLOCATE); WAIT_EVENT_HASH_BATCH_ALLOCATE);
/* Fall through. */ /* Fall through. */
case PHJ_BATCH_LOAD: case PHJ_BATCH_LOADING:
/* Start (or join in) loading tuples. */ /* Start (or join in) loading tuples. */
ExecParallelHashTableSetCurrentBatch(hashtable, batchno); ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
inner_tuples = hashtable->batches[batchno].inner_tuples; inner_tuples = hashtable->batches[batchno].inner_tuples;
...@@ -1176,7 +1165,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) ...@@ -1176,7 +1165,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
WAIT_EVENT_HASH_BATCH_LOAD); WAIT_EVENT_HASH_BATCH_LOAD);
/* Fall through. */ /* Fall through. */
case PHJ_BATCH_PROBE: case PHJ_BATCH_PROBING:
/* /*
* This batch is ready to probe. Return control to * This batch is ready to probe. Return control to
...@@ -1186,13 +1175,13 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) ...@@ -1186,13 +1175,13 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
* this barrier again (or else a deadlock could occur). * this barrier again (or else a deadlock could occur).
* All attached participants must eventually call * All attached participants must eventually call
* BarrierArriveAndDetach() so that the final phase * BarrierArriveAndDetach() so that the final phase
* PHJ_BATCH_FREE can be reached. * PHJ_BATCH_DONE can be reached.
*/ */
ExecParallelHashTableSetCurrentBatch(hashtable, batchno); ExecParallelHashTableSetCurrentBatch(hashtable, batchno);
sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples);
return true; return true;
case PHJ_BATCH_FREE: case PHJ_BATCH_DONE:
/* /*
* Already done. Detach and go around again (if any * Already done. Detach and go around again (if any
...@@ -1519,7 +1508,7 @@ ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt) ...@@ -1519,7 +1508,7 @@ ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
/* /*
* It would be possible to reuse the shared hash table in single-batch * It would be possible to reuse the shared hash table in single-batch
* cases by resetting and then fast-forwarding build_barrier to * cases by resetting and then fast-forwarding build_barrier to
* PHJ_BUILD_FREE and batch 0's batch_barrier to PHJ_BATCH_PROBE, but * PHJ_BUILD_DONE and batch 0's batch_barrier to PHJ_BATCH_PROBING, but
* currently shared hash tables are already freed by now (by the last * currently shared hash tables are already freed by now (by the last
* participant to detach from the batch). We could consider keeping it * participant to detach from the batch). We could consider keeping it
* around for single-batch joins. We'd also need to adjust * around for single-batch joins. We'd also need to adjust
...@@ -1538,7 +1527,7 @@ ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt) ...@@ -1538,7 +1527,7 @@ ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *cxt)
/* Clear any shared batch files. */ /* Clear any shared batch files. */
SharedFileSetDeleteAll(&pstate->fileset); SharedFileSetDeleteAll(&pstate->fileset);
/* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */ /* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
BarrierInit(&pstate->build_barrier, 0); BarrierInit(&pstate->build_barrier, 0);
} }
......
...@@ -4043,8 +4043,8 @@ pgstat_get_wait_ipc(WaitEventIPC w) ...@@ -4043,8 +4043,8 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_BUILD_HASH_OUTER: case WAIT_EVENT_HASH_BUILD_HASH_OUTER:
event_name = "HashBuildHashOuter"; event_name = "HashBuildHashOuter";
break; break;
case WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE: case WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE:
event_name = "HashGrowBatchesReallocate"; event_name = "HashGrowBatchesAllocate";
break; break;
case WAIT_EVENT_HASH_GROW_BATCHES_DECIDE: case WAIT_EVENT_HASH_GROW_BATCHES_DECIDE:
event_name = "HashGrowBatchesDecide"; event_name = "HashGrowBatchesDecide";
...@@ -4058,8 +4058,8 @@ pgstat_get_wait_ipc(WaitEventIPC w) ...@@ -4058,8 +4058,8 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION: case WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION:
event_name = "HashGrowBatchesRepartition"; event_name = "HashGrowBatchesRepartition";
break; break;
case WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE: case WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE:
event_name = "HashGrowBucketsReallocate"; event_name = "HashGrowBucketsAllocate";
break; break;
case WAIT_EVENT_HASH_GROW_BUCKETS_ELECT: case WAIT_EVENT_HASH_GROW_BUCKETS_ELECT:
event_name = "HashGrowBucketsElect"; event_name = "HashGrowBucketsElect";
......
...@@ -254,32 +254,31 @@ typedef struct ParallelHashJoinState ...@@ -254,32 +254,31 @@ typedef struct ParallelHashJoinState
} ParallelHashJoinState; } ParallelHashJoinState;
/* The phases for building batches, used by build_barrier. */ /* The phases for building batches, used by build_barrier. */
#define PHJ_BUILD_ELECT 0 #define PHJ_BUILD_ELECTING 0
#define PHJ_BUILD_ALLOCATE 1 #define PHJ_BUILD_ALLOCATING 1
#define PHJ_BUILD_HASH_INNER 2 #define PHJ_BUILD_HASHING_INNER 2
#define PHJ_BUILD_HASH_OUTER 3 #define PHJ_BUILD_HASHING_OUTER 3
#define PHJ_BUILD_RUN 4 #define PHJ_BUILD_DONE 4
#define PHJ_BUILD_FREE 5
/* The phases for probing each batch, used by for batch_barrier. */ /* The phases for probing each batch, used by for batch_barrier. */
#define PHJ_BATCH_ELECT 0 #define PHJ_BATCH_ELECTING 0
#define PHJ_BATCH_ALLOCATE 1 #define PHJ_BATCH_ALLOCATING 1
#define PHJ_BATCH_LOAD 2 #define PHJ_BATCH_LOADING 2
#define PHJ_BATCH_PROBE 3 #define PHJ_BATCH_PROBING 3
#define PHJ_BATCH_FREE 4 #define PHJ_BATCH_DONE 4
/* The phases of batch growth while hashing, for grow_batches_barrier. */ /* The phases of batch growth while hashing, for grow_batches_barrier. */
#define PHJ_GROW_BATCHES_ELECT 0 #define PHJ_GROW_BATCHES_ELECTING 0
#define PHJ_GROW_BATCHES_REALLOCATE 1 #define PHJ_GROW_BATCHES_ALLOCATING 1
#define PHJ_GROW_BATCHES_REPARTITION 2 #define PHJ_GROW_BATCHES_REPARTITIONING 2
#define PHJ_GROW_BATCHES_DECIDE 3 #define PHJ_GROW_BATCHES_DECIDING 3
#define PHJ_GROW_BATCHES_FINISH 4 #define PHJ_GROW_BATCHES_FINISHING 4
#define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */ #define PHJ_GROW_BATCHES_PHASE(n) ((n) % 5) /* circular phases */
/* The phases of bucket growth while hashing, for grow_buckets_barrier. */ /* The phases of bucket growth while hashing, for grow_buckets_barrier. */
#define PHJ_GROW_BUCKETS_ELECT 0 #define PHJ_GROW_BUCKETS_ELECTING 0
#define PHJ_GROW_BUCKETS_REALLOCATE 1 #define PHJ_GROW_BUCKETS_ALLOCATING 1
#define PHJ_GROW_BUCKETS_REINSERT 2 #define PHJ_GROW_BUCKETS_REINSERTING 2
#define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */ #define PHJ_GROW_BUCKETS_PHASE(n) ((n) % 3) /* circular phases */
typedef struct HashJoinTableData typedef struct HashJoinTableData
......
...@@ -982,12 +982,12 @@ typedef enum ...@@ -982,12 +982,12 @@ typedef enum
WAIT_EVENT_HASH_BUILD_ELECT, WAIT_EVENT_HASH_BUILD_ELECT,
WAIT_EVENT_HASH_BUILD_HASH_INNER, WAIT_EVENT_HASH_BUILD_HASH_INNER,
WAIT_EVENT_HASH_BUILD_HASH_OUTER, WAIT_EVENT_HASH_BUILD_HASH_OUTER,
WAIT_EVENT_HASH_GROW_BATCHES_REALLOCATE, WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATE,
WAIT_EVENT_HASH_GROW_BATCHES_DECIDE, WAIT_EVENT_HASH_GROW_BATCHES_DECIDE,
WAIT_EVENT_HASH_GROW_BATCHES_ELECT, WAIT_EVENT_HASH_GROW_BATCHES_ELECT,
WAIT_EVENT_HASH_GROW_BATCHES_FINISH, WAIT_EVENT_HASH_GROW_BATCHES_FINISH,
WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION, WAIT_EVENT_HASH_GROW_BATCHES_REPARTITION,
WAIT_EVENT_HASH_GROW_BUCKETS_REALLOCATE, WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE,
WAIT_EVENT_HASH_GROW_BUCKETS_ELECT, WAIT_EVENT_HASH_GROW_BUCKETS_ELECT,
WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT, WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT,
WAIT_EVENT_LOGICAL_SYNC_DATA, WAIT_EVENT_LOGICAL_SYNC_DATA,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment