Commit cf1234a1 authored by Andres Freund's avatar Andres Freund

Fix deadlock danger when atomic ops are done under spinlock.

This was a danger only for --disable-spinlocks in combination with
atomic operations unsupported by the current platform.

While atomics.c was careful to signal that a separate semaphore ought
to be used when spinlock emulation is active, spin.c didn't actually
implement that mechanism. That's my (Andres') fault, it seems to have
gotten lost during the development of the atomic operations support.

Fix that issue and add test for nesting atomic operations inside a
spinlock.

Author: Andres Freund
Discussion: https://postgr.es/m/20200605023302.g6v3ydozy5txifji@alap3.anarazel.de
Backpatch: 9.5-
parent 3b37a6de
...@@ -28,8 +28,24 @@ ...@@ -28,8 +28,24 @@
#ifndef HAVE_SPINLOCKS #ifndef HAVE_SPINLOCKS
/*
* No TAS, so spinlocks are implemented as PGSemaphores.
*/
#ifndef HAVE_ATOMICS
#define NUM_EMULATION_SEMAPHORES (NUM_SPINLOCK_SEMAPHORES + NUM_ATOMICS_SEMAPHORES)
#else
#define NUM_EMULATION_SEMAPHORES (NUM_SPINLOCK_SEMAPHORES)
#endif /* DISABLE_ATOMICS */
PGSemaphore *SpinlockSemaArray; PGSemaphore *SpinlockSemaArray;
#endif
#else /* !HAVE_SPINLOCKS */
#define NUM_EMULATION_SEMAPHORES 0
#endif /* HAVE_SPINLOCKS */
/* /*
* Report the amount of shared memory needed to store semaphores for spinlock * Report the amount of shared memory needed to store semaphores for spinlock
...@@ -38,34 +54,19 @@ PGSemaphore *SpinlockSemaArray; ...@@ -38,34 +54,19 @@ PGSemaphore *SpinlockSemaArray;
Size Size
SpinlockSemaSize(void) SpinlockSemaSize(void)
{ {
return SpinlockSemas() * sizeof(PGSemaphore); return NUM_EMULATION_SEMAPHORES * sizeof(PGSemaphore);
} }
#ifdef HAVE_SPINLOCKS
/* /*
* Report number of semaphores needed to support spinlocks. * Report number of semaphores needed to support spinlocks.
*/ */
int int
SpinlockSemas(void) SpinlockSemas(void)
{ {
return 0; return NUM_EMULATION_SEMAPHORES;
} }
#else /* !HAVE_SPINLOCKS */
/* #ifndef HAVE_SPINLOCKS
* No TAS, so spinlocks are implemented as PGSemaphores.
*/
/*
* Report number of semaphores needed to support spinlocks.
*/
int
SpinlockSemas(void)
{
return NUM_SPINLOCK_SEMAPHORES + NUM_ATOMICS_SEMAPHORES;
}
/* /*
* Initialize spinlock emulation. * Initialize spinlock emulation.
...@@ -92,23 +93,59 @@ SpinlockSemaInit(void) ...@@ -92,23 +93,59 @@ SpinlockSemaInit(void)
/* /*
* s_lock.h hardware-spinlock emulation using semaphores * s_lock.h hardware-spinlock emulation using semaphores
* *
* We map all spinlocks onto a set of NUM_SPINLOCK_SEMAPHORES semaphores. * We map all spinlocks onto NUM_EMULATION_SEMAPHORES semaphores. It's okay to
* It's okay to map multiple spinlocks onto one semaphore because no process * map multiple spinlocks onto one semaphore because no process should ever
* should ever hold more than one at a time. We just need enough semaphores * hold more than one at a time. We just need enough semaphores so that we
* so that we aren't adding too much extra contention from that. * aren't adding too much extra contention from that.
*
* There is one exception to the restriction of only holding one spinlock at a
* time, which is that it's ok if emulated atomic operations are nested inside
* spinlocks. To avoid the danger of spinlocks and atomic using the same sema,
* we make sure "normal" spinlocks and atomics backed by spinlocks use
* distinct semaphores (see the nested argument to s_init_lock_sema).
* *
* slock_t is just an int for this implementation; it holds the spinlock * slock_t is just an int for this implementation; it holds the spinlock
* number from 1..NUM_SPINLOCK_SEMAPHORES. We intentionally ensure that 0 * number from 1..NUM_EMULATION_SEMAPHORES. We intentionally ensure that 0
* is not a valid value, so that testing with this code can help find * is not a valid value, so that testing with this code can help find
* failures to initialize spinlocks. * failures to initialize spinlocks.
*/ */
static inline void
s_check_valid(int lockndx)
{
if (unlikely(lockndx <= 0 || lockndx > NUM_EMULATION_SEMAPHORES))
elog(ERROR, "invalid spinlock number: %d", lockndx);
}
void void
s_init_lock_sema(volatile slock_t *lock, bool nested) s_init_lock_sema(volatile slock_t *lock, bool nested)
{ {
static uint32 counter = 0; static uint32 counter = 0;
uint32 offset;
*lock = ((++counter) % NUM_SPINLOCK_SEMAPHORES) + 1; uint32 sema_total;
uint32 idx;
if (nested)
{
/*
* To allow nesting atomics inside spinlocked sections, use a
* different spinlock. See comment above.
*/
offset = 1 + NUM_SPINLOCK_SEMAPHORES;
sema_total = NUM_ATOMICS_SEMAPHORES;
}
else
{
offset = 1;
sema_total = NUM_SPINLOCK_SEMAPHORES;
}
idx = (counter++ % sema_total) + offset;
/* double check we did things correctly */
s_check_valid(idx);
*lock = idx;
} }
void void
...@@ -116,8 +153,8 @@ s_unlock_sema(volatile slock_t *lock) ...@@ -116,8 +153,8 @@ s_unlock_sema(volatile slock_t *lock)
{ {
int lockndx = *lock; int lockndx = *lock;
if (lockndx <= 0 || lockndx > NUM_SPINLOCK_SEMAPHORES) s_check_valid(lockndx);
elog(ERROR, "invalid spinlock number: %d", lockndx);
PGSemaphoreUnlock(SpinlockSemaArray[lockndx - 1]); PGSemaphoreUnlock(SpinlockSemaArray[lockndx - 1]);
} }
...@@ -134,8 +171,8 @@ tas_sema(volatile slock_t *lock) ...@@ -134,8 +171,8 @@ tas_sema(volatile slock_t *lock)
{ {
int lockndx = *lock; int lockndx = *lock;
if (lockndx <= 0 || lockndx > NUM_SPINLOCK_SEMAPHORES) s_check_valid(lockndx);
elog(ERROR, "invalid spinlock number: %d", lockndx);
/* Note that TAS macros return 0 if *success* */ /* Note that TAS macros return 0 if *success* */
return !PGSemaphoreTryLock(SpinlockSemaArray[lockndx - 1]); return !PGSemaphoreTryLock(SpinlockSemaArray[lockndx - 1]);
} }
......
...@@ -898,6 +898,56 @@ test_spinlock(void) ...@@ -898,6 +898,56 @@ test_spinlock(void)
#endif #endif
} }
/*
* Verify that performing atomic ops inside a spinlock isn't a
* problem. Realistically that's only going to be a problem when both
* --disable-spinlocks and --disable-atomics are used, but it's cheap enough
* to just always test.
*
* The test works by initializing enough atomics that we'd conflict if there
* were an overlap between a spinlock and an atomic by holding a spinlock
* while manipulating more than NUM_SPINLOCK_SEMAPHORES atomics.
*
* NUM_TEST_ATOMICS doesn't really need to be more than
* NUM_SPINLOCK_SEMAPHORES, but it seems better to test a bit more
* extensively.
*/
static void
test_atomic_spin_nest(void)
{
slock_t lock;
#define NUM_TEST_ATOMICS (NUM_SPINLOCK_SEMAPHORES + NUM_ATOMICS_SEMAPHORES + 27)
pg_atomic_uint32 atomics32[NUM_TEST_ATOMICS];
pg_atomic_uint64 atomics64[NUM_TEST_ATOMICS];
SpinLockInit(&lock);
for (int i = 0; i < NUM_TEST_ATOMICS; i++)
{
pg_atomic_init_u32(&atomics32[i], 0);
pg_atomic_init_u64(&atomics64[i], 0);
}
/* just so it's not all zeroes */
for (int i = 0; i < NUM_TEST_ATOMICS; i++)
{
EXPECT_EQ_U32(pg_atomic_fetch_add_u32(&atomics32[i], i), 0);
EXPECT_EQ_U64(pg_atomic_fetch_add_u64(&atomics64[i], i), 0);
}
/* test whether we can do atomic op with lock held */
SpinLockAcquire(&lock);
for (int i = 0; i < NUM_TEST_ATOMICS; i++)
{
EXPECT_EQ_U32(pg_atomic_fetch_sub_u32(&atomics32[i], i), i);
EXPECT_EQ_U32(pg_atomic_read_u32(&atomics32[i]), 0);
EXPECT_EQ_U64(pg_atomic_fetch_sub_u64(&atomics64[i], i), i);
EXPECT_EQ_U64(pg_atomic_read_u64(&atomics64[i]), 0);
}
SpinLockRelease(&lock);
}
#undef NUM_TEST_ATOMICS
PG_FUNCTION_INFO_V1(test_atomic_ops); PG_FUNCTION_INFO_V1(test_atomic_ops);
Datum Datum
test_atomic_ops(PG_FUNCTION_ARGS) test_atomic_ops(PG_FUNCTION_ARGS)
...@@ -914,6 +964,8 @@ test_atomic_ops(PG_FUNCTION_ARGS) ...@@ -914,6 +964,8 @@ test_atomic_ops(PG_FUNCTION_ARGS)
*/ */
test_spinlock(); test_spinlock();
test_atomic_spin_nest();
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment