Commit 3b8981b6 authored by Thomas Munro's avatar Thomas Munro

Fix race in Parallel Hash Join batch cleanup.

With very unlucky timing and parallel_leader_participation off, PHJ
could attempt to access per-batch state just as it was being freed.
There was code intended to prevent that by checking for a cleared
pointer, but it was buggy.

Fix, by introducing an extra barrier phase.  The new phase
PHJ_BUILD_RUNNING means that it's safe to access the per-batch state to
find a batch to help with, and PHJ_BUILD_DONE means that it is too late.
The last to detach will free the array of per-batch state as before, but
now it will also atomically advance the phase at the same time, so that
late attachers can avoid the hazard, without the data race.  This
mirrors the way per-batch hash tables are freed (see phases
PHJ_BATCH_PROBING and PHJ_BATCH_DONE).

Revealed by a one-off build farm failure, where BarrierAttach() failed a
sanity check assertion, because the memory had been clobbered by
dsa_free().

Back-patch to 11, where the code arrived.
Reported-by: default avatarMichael Paquier <michael@paquier.xyz>
Discussion: https://postgr.es/m/20200929061142.GA29096%40paquier.xyz
parent 37929599
...@@ -333,14 +333,21 @@ MultiExecParallelHash(HashState *node) ...@@ -333,14 +333,21 @@ MultiExecParallelHash(HashState *node)
hashtable->nbuckets = pstate->nbuckets; hashtable->nbuckets = pstate->nbuckets;
hashtable->log2_nbuckets = my_log2(hashtable->nbuckets); hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
hashtable->totalTuples = pstate->total_tuples; hashtable->totalTuples = pstate->total_tuples;
ExecParallelHashEnsureBatchAccessors(hashtable);
/*
* Unless we're completely done and the batch state has been freed, make
* sure we have accessors.
*/
if (BarrierPhase(build_barrier) < PHJ_BUILD_DONE)
ExecParallelHashEnsureBatchAccessors(hashtable);
/* /*
* The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
* case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't * case, which will bring the build phase to PHJ_BUILD_RUNNING (if it isn't
* there already). * there already).
*/ */
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE); BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
} }
...@@ -624,7 +631,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, ...@@ -624,7 +631,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
/* /*
* The next Parallel Hash synchronization point is in * The next Parallel Hash synchronization point is in
* MultiExecParallelHash(), which will progress it all the way to * MultiExecParallelHash(), which will progress it all the way to
* PHJ_BUILD_DONE. The caller must not return control from this * PHJ_BUILD_RUNNING. The caller must not return control from this
* executor node between now and then. * executor node between now and then.
*/ */
} }
...@@ -3048,14 +3055,11 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) ...@@ -3048,14 +3055,11 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
} }
/* /*
* It's possible for a backend to start up very late so that the whole * We should never see a state where the batch-tracking array is freed,
* join is finished and the shm state for tracking batches has already * because we should have given up sooner if we join when the build barrier
* been freed by ExecHashTableDetach(). In that case we'll just leave * has reached the PHJ_BUILD_DONE phase.
* hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
* up early.
*/ */
if (!DsaPointerIsValid(pstate->batches)) Assert(DsaPointerIsValid(pstate->batches));
return;
/* Use hash join memory context. */ /* Use hash join memory context. */
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
...@@ -3175,9 +3179,17 @@ ExecHashTableDetachBatch(HashJoinTable hashtable) ...@@ -3175,9 +3179,17 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
void void
ExecHashTableDetach(HashJoinTable hashtable) ExecHashTableDetach(HashJoinTable hashtable)
{ {
if (hashtable->parallel_state) ParallelHashJoinState *pstate = hashtable->parallel_state;
/*
* If we're involved in a parallel query, we must either have got all the
* way to PHJ_BUILD_RUNNING, or joined too late and be in PHJ_BUILD_DONE.
*/
Assert(!pstate ||
BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUNNING);
if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUNNING)
{ {
ParallelHashJoinState *pstate = hashtable->parallel_state;
int i; int i;
/* Make sure any temporary files are closed. */ /* Make sure any temporary files are closed. */
...@@ -3193,17 +3205,22 @@ ExecHashTableDetach(HashJoinTable hashtable) ...@@ -3193,17 +3205,22 @@ ExecHashTableDetach(HashJoinTable hashtable)
} }
/* If we're last to detach, clean up shared memory. */ /* If we're last to detach, clean up shared memory. */
if (BarrierDetach(&pstate->build_barrier)) if (BarrierArriveAndDetach(&pstate->build_barrier))
{ {
/*
* Late joining processes will see this state and give up
* immediately.
*/
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_DONE);
if (DsaPointerIsValid(pstate->batches)) if (DsaPointerIsValid(pstate->batches))
{ {
dsa_free(hashtable->area, pstate->batches); dsa_free(hashtable->area, pstate->batches);
pstate->batches = InvalidDsaPointer; pstate->batches = InvalidDsaPointer;
} }
} }
hashtable->parallel_state = NULL;
} }
hashtable->parallel_state = NULL;
} }
/* /*
......
...@@ -45,7 +45,8 @@ ...@@ -45,7 +45,8 @@
* PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0 * PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
* PHJ_BUILD_HASHING_INNER -- all hash the inner rel * PHJ_BUILD_HASHING_INNER -- all hash the inner rel
* PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer * PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
* PHJ_BUILD_DONE -- building done, probing can begin * PHJ_BUILD_RUNNING -- building done, probing can begin
* PHJ_BUILD_DONE -- all work complete, one frees batches
* *
* While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may * While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
* be used repeatedly as required to coordinate expansions in the number of * be used repeatedly as required to coordinate expansions in the number of
...@@ -73,7 +74,7 @@ ...@@ -73,7 +74,7 @@
* batches whenever it encounters them while scanning and probing, which it * batches whenever it encounters them while scanning and probing, which it
* can do because it processes batches in serial order. * can do because it processes batches in serial order.
* *
* Once PHJ_BUILD_DONE is reached, backends then split up and process * Once PHJ_BUILD_RUNNING is reached, backends then split up and process
* different batches, or gang up and work together on probing batches if there * different batches, or gang up and work together on probing batches if there
* aren't enough to go around. For each batch there is a separate barrier * aren't enough to go around. For each batch there is a separate barrier
* with the following phases: * with the following phases:
...@@ -95,11 +96,16 @@ ...@@ -95,11 +96,16 @@
* *
* To avoid deadlocks, we never wait for any barrier unless it is known that * To avoid deadlocks, we never wait for any barrier unless it is known that
* all other backends attached to it are actively executing the node or have * all other backends attached to it are actively executing the node or have
* already arrived. Practically, that means that we never return a tuple * finished. Practically, that means that we never emit a tuple while attached
* while attached to a barrier, unless the barrier has reached its final * to a barrier, unless the barrier has reached a phase that means that no
* state. In the slightly special case of the per-batch barrier, we return * process will wait on it again. We emit tuples while attached to the build
* tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use * barrier in phase PHJ_BUILD_RUNNING, and to a per-batch barrier in phase
* BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting. * PHJ_BATCH_PROBING. These are advanced to PHJ_BUILD_DONE and PHJ_BATCH_DONE
* respectively without waiting, using BarrierArriveAndDetach(). The last to
* detach receives a different return value so that it knows that it's safe to
* clean up. Any straggler process that attaches after that phase is reached
* will see that it's too late to participate or access the relevant shared
* memory objects.
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -317,6 +323,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) ...@@ -317,6 +323,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
build_barrier = &parallel_state->build_barrier; build_barrier = &parallel_state->build_barrier;
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER || Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE); BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER) if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
{ {
...@@ -329,9 +336,18 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) ...@@ -329,9 +336,18 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
BarrierArriveAndWait(build_barrier, BarrierArriveAndWait(build_barrier,
WAIT_EVENT_HASH_BUILD_HASH_OUTER); WAIT_EVENT_HASH_BUILD_HASH_OUTER);
} }
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE); else if (BarrierPhase(build_barrier) == PHJ_BUILD_DONE)
{
/*
* If we attached so late that the job is finished and
* the batch state has been freed, we can return
* immediately.
*/
return NULL;
}
/* Each backend should now select a batch to work on. */ /* Each backend should now select a batch to work on. */
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING);
hashtable->curbatch = -1; hashtable->curbatch = -1;
node->hj_JoinState = HJ_NEED_NEW_BATCH; node->hj_JoinState = HJ_NEED_NEW_BATCH;
...@@ -1090,14 +1106,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) ...@@ -1090,14 +1106,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
int start_batchno; int start_batchno;
int batchno; int batchno;
/*
* If we started up so late that the batch tracking array has been freed
* already by ExecHashTableDetach(), then we are finished. See also
* ExecParallelHashEnsureBatchAccessors().
*/
if (hashtable->batches == NULL)
return false;
/* /*
* If we were already attached to a batch, remember not to bother checking * If we were already attached to a batch, remember not to bother checking
* it again, and detach from it (possibly freeing the hash table if we are * it again, and detach from it (possibly freeing the hash table if we are
......
...@@ -258,7 +258,8 @@ typedef struct ParallelHashJoinState ...@@ -258,7 +258,8 @@ typedef struct ParallelHashJoinState
#define PHJ_BUILD_ALLOCATING 1 #define PHJ_BUILD_ALLOCATING 1
#define PHJ_BUILD_HASHING_INNER 2 #define PHJ_BUILD_HASHING_INNER 2
#define PHJ_BUILD_HASHING_OUTER 3 #define PHJ_BUILD_HASHING_OUTER 3
#define PHJ_BUILD_DONE 4 #define PHJ_BUILD_RUNNING 4
#define PHJ_BUILD_DONE 5
/* The phases for probing each batch, used by for batch_barrier. */ /* The phases for probing each batch, used by for batch_barrier. */
#define PHJ_BATCH_ELECTING 0 #define PHJ_BATCH_ELECTING 0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment