Commit 008608b9 authored by Andres Freund's avatar Andres Freund

Avoid the use of a separate spinlock to protect a LWLock's wait queue.

Previously we used a spinlock, in adition to the atomically manipulated
->state field, to protect the wait queue. But it's pretty simple to
instead perform the locking using a flag in state.

Due to 6150a1b0 BufferDescs, on platforms (like PPC) with > 1 byte
spinlocks, increased their size above 64byte. As 64 bytes are the size
we pad allocated BufferDescs to, this can increase false sharing;
causing performance problems in turn. Together with the previous commit
this reduces the size to <= 64 bytes on all common platforms.

Author: Andres Freund
Discussion: CAA4eK1+ZeB8PMwwktf+3bRS0Pt4Ux6Rs6Aom0uip8c6shJWmyg@mail.gmail.com
    20160327121858.zrmrjegmji2ymnvr@alap3.anarazel.de
parent 48354581
...@@ -97,6 +97,7 @@ extern slock_t *ShmemLock; ...@@ -97,6 +97,7 @@ extern slock_t *ShmemLock;
#define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30) #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
#define LW_FLAG_RELEASE_OK ((uint32) 1 << 29) #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
#define LW_FLAG_LOCKED ((uint32) 1 << 28)
#define LW_VAL_EXCLUSIVE ((uint32) 1 << 24) #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
#define LW_VAL_SHARED 1 #define LW_VAL_SHARED 1
...@@ -711,7 +712,6 @@ RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks) ...@@ -711,7 +712,6 @@ RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
void void
LWLockInitialize(LWLock *lock, int tranche_id) LWLockInitialize(LWLock *lock, int tranche_id)
{ {
SpinLockInit(&lock->mutex);
pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK); pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
#ifdef LOCK_DEBUG #ifdef LOCK_DEBUG
pg_atomic_init_u32(&lock->nwaiters, 0); pg_atomic_init_u32(&lock->nwaiters, 0);
...@@ -842,6 +842,74 @@ LWLockAttemptLock(LWLock *lock, LWLockMode mode) ...@@ -842,6 +842,74 @@ LWLockAttemptLock(LWLock *lock, LWLockMode mode)
pg_unreachable(); pg_unreachable();
} }
/*
* Lock the LWLock's wait list against concurrent activity.
*
* NB: even though the wait list is locked, non-conflicting lock operations
* may still happen concurrently.
*
* Time spent holding mutex should be short!
*/
static void
LWLockWaitListLock(LWLock *lock)
{
uint32 old_state;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
uint32 delays = 0;
lwstats = get_lwlock_stats_entry(lock);
#endif
while (true)
{
/* always try once to acquire lock directly */
old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
if (!(old_state & LW_FLAG_LOCKED))
break; /* got lock */
/* and then spin without atomic operations until lock is released */
{
SpinDelayStatus delayStatus = init_spin_delay(&lock->state);
while (old_state & LW_FLAG_LOCKED)
{
perform_spin_delay(&delayStatus);
old_state = pg_atomic_read_u32(&lock->state);
}
#ifdef LWLOCK_STATS
delays += delayStatus.delays;
#endif
finish_spin_delay(&delayStatus);
}
/*
* Retry. The lock might obviously already be re-acquired by the time
* we're attempting to get it again.
*/
}
#ifdef LWLOCK_STATS
lwstats->spin_delay_count += delays;
#endif
}
/*
* Unlock the LWLock's wait list.
*
* Note that it can be more efficient to manipulate flags and release the
* locks in a single atomic operation.
*/
static void
LWLockWaitListUnlock(LWLock *lock)
{
uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
Assert(old_state & LW_FLAG_LOCKED);
}
/* /*
* Wakeup all the lockers that currently have a chance to acquire the lock. * Wakeup all the lockers that currently have a chance to acquire the lock.
*/ */
...@@ -852,22 +920,13 @@ LWLockWakeup(LWLock *lock) ...@@ -852,22 +920,13 @@ LWLockWakeup(LWLock *lock)
bool wokeup_somebody = false; bool wokeup_somebody = false;
dlist_head wakeup; dlist_head wakeup;
dlist_mutable_iter iter; dlist_mutable_iter iter;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
lwstats = get_lwlock_stats_entry(lock);
#endif
dlist_init(&wakeup); dlist_init(&wakeup);
new_release_ok = true; new_release_ok = true;
/* Acquire mutex. Time spent holding mutex should be short! */ /* lock wait list while collecting backends to wake up */
#ifdef LWLOCK_STATS LWLockWaitListLock(lock);
lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else
SpinLockAcquire(&lock->mutex);
#endif
dlist_foreach_modify(iter, &lock->waiters) dlist_foreach_modify(iter, &lock->waiters)
{ {
...@@ -904,19 +963,33 @@ LWLockWakeup(LWLock *lock) ...@@ -904,19 +963,33 @@ LWLockWakeup(LWLock *lock)
Assert(dlist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS); Assert(dlist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
/* Unset both flags at once if required */ /* unset required flags, and release lock, in one fell swoop */
if (!new_release_ok && dlist_is_empty(&wakeup)) {
pg_atomic_fetch_and_u32(&lock->state, uint32 old_state;
~(LW_FLAG_RELEASE_OK | LW_FLAG_HAS_WAITERS)); uint32 desired_state;
else if (!new_release_ok)
pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_RELEASE_OK); old_state = pg_atomic_read_u32(&lock->state);
else if (dlist_is_empty(&wakeup)) while (true)
pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS); {
else if (new_release_ok) desired_state = old_state;
pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
/* compute desired flags */
if (new_release_ok)
desired_state |= LW_FLAG_RELEASE_OK;
else
desired_state &= ~LW_FLAG_RELEASE_OK;
/* We are done updating the shared state of the lock queue. */ if (dlist_is_empty(&wakeup))
SpinLockRelease(&lock->mutex); desired_state &= ~LW_FLAG_HAS_WAITERS;
desired_state &= ~LW_FLAG_LOCKED; /* release lock */
if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
desired_state))
break;
}
}
/* Awaken any waiters I removed from the queue. */ /* Awaken any waiters I removed from the queue. */
dlist_foreach_modify(iter, &wakeup) dlist_foreach_modify(iter, &wakeup)
...@@ -933,7 +1006,7 @@ LWLockWakeup(LWLock *lock) ...@@ -933,7 +1006,7 @@ LWLockWakeup(LWLock *lock)
* that happens before the list unlink happens, the list would end up * that happens before the list unlink happens, the list would end up
* being corrupted. * being corrupted.
* *
* The barrier pairs with the SpinLockAcquire() when enqueing for * The barrier pairs with the LWLockWaitListLock() when enqueing for
* another lock. * another lock.
*/ */
pg_write_barrier(); pg_write_barrier();
...@@ -950,12 +1023,6 @@ LWLockWakeup(LWLock *lock) ...@@ -950,12 +1023,6 @@ LWLockWakeup(LWLock *lock)
static void static void
LWLockQueueSelf(LWLock *lock, LWLockMode mode) LWLockQueueSelf(LWLock *lock, LWLockMode mode)
{ {
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
lwstats = get_lwlock_stats_entry(lock);
#endif
/* /*
* If we don't have a PGPROC structure, there's no way to wait. This * If we don't have a PGPROC structure, there's no way to wait. This
* should never occur, since MyProc should only be null during shared * should never occur, since MyProc should only be null during shared
...@@ -967,11 +1034,7 @@ LWLockQueueSelf(LWLock *lock, LWLockMode mode) ...@@ -967,11 +1034,7 @@ LWLockQueueSelf(LWLock *lock, LWLockMode mode)
if (MyProc->lwWaiting) if (MyProc->lwWaiting)
elog(PANIC, "queueing for lock while waiting on another one"); elog(PANIC, "queueing for lock while waiting on another one");
#ifdef LWLOCK_STATS LWLockWaitListLock(lock);
lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else
SpinLockAcquire(&lock->mutex);
#endif
/* setting the flag is protected by the spinlock */ /* setting the flag is protected by the spinlock */
pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS); pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
...@@ -986,7 +1049,7 @@ LWLockQueueSelf(LWLock *lock, LWLockMode mode) ...@@ -986,7 +1049,7 @@ LWLockQueueSelf(LWLock *lock, LWLockMode mode)
dlist_push_tail(&lock->waiters, &MyProc->lwWaitLink); dlist_push_tail(&lock->waiters, &MyProc->lwWaitLink);
/* Can release the mutex now */ /* Can release the mutex now */
SpinLockRelease(&lock->mutex); LWLockWaitListUnlock(lock);
#ifdef LOCK_DEBUG #ifdef LOCK_DEBUG
pg_atomic_fetch_add_u32(&lock->nwaiters, 1); pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
...@@ -1015,11 +1078,7 @@ LWLockDequeueSelf(LWLock *lock) ...@@ -1015,11 +1078,7 @@ LWLockDequeueSelf(LWLock *lock)
lwstats->dequeue_self_count++; lwstats->dequeue_self_count++;
#endif #endif
#ifdef LWLOCK_STATS LWLockWaitListLock(lock);
lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else
SpinLockAcquire(&lock->mutex);
#endif
/* /*
* Can't just remove ourselves from the list, but we need to iterate over * Can't just remove ourselves from the list, but we need to iterate over
...@@ -1043,7 +1102,8 @@ LWLockDequeueSelf(LWLock *lock) ...@@ -1043,7 +1102,8 @@ LWLockDequeueSelf(LWLock *lock)
pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS); pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
} }
SpinLockRelease(&lock->mutex); /* XXX: combine with fetch_and above? */
LWLockWaitListUnlock(lock);
/* clear waiting state again, nice for debugging */ /* clear waiting state again, nice for debugging */
if (found) if (found)
...@@ -1460,11 +1520,6 @@ LWLockConflictsWithVar(LWLock *lock, ...@@ -1460,11 +1520,6 @@ LWLockConflictsWithVar(LWLock *lock,
{ {
bool mustwait; bool mustwait;
uint64 value; uint64 value;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
lwstats = get_lwlock_stats_entry(lock);
#endif
/* /*
* Test first to see if it the slot is free right now. * Test first to see if it the slot is free right now.
...@@ -1484,17 +1539,13 @@ LWLockConflictsWithVar(LWLock *lock, ...@@ -1484,17 +1539,13 @@ LWLockConflictsWithVar(LWLock *lock,
*result = false; *result = false;
/* /*
* Read value using spinlock as we can't rely on atomic 64 bit * Read value using the lwlock's wait list lock, as we can't generally
* reads/stores. TODO: On platforms with a way to do atomic 64 bit * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to
* reads/writes the spinlock could be optimized away. * do atomic 64 bit reads/writes the spinlock should be optimized away.
*/ */
#ifdef LWLOCK_STATS LWLockWaitListLock(lock);
lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else
SpinLockAcquire(&lock->mutex);
#endif
value = *valptr; value = *valptr;
SpinLockRelease(&lock->mutex); LWLockWaitListUnlock(lock);
if (value != oldval) if (value != oldval)
{ {
...@@ -1668,22 +1719,12 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val) ...@@ -1668,22 +1719,12 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
{ {
dlist_head wakeup; dlist_head wakeup;
dlist_mutable_iter iter; dlist_mutable_iter iter;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
lwstats = get_lwlock_stats_entry(lock);
#endif
PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE); PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
dlist_init(&wakeup); dlist_init(&wakeup);
/* Acquire mutex. Time spent holding mutex should be short! */ LWLockWaitListLock(lock);
#ifdef LWLOCK_STATS
lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else
SpinLockAcquire(&lock->mutex);
#endif
Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE); Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
...@@ -1706,7 +1747,7 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val) ...@@ -1706,7 +1747,7 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
} }
/* We are done updating shared state of the lock itself. */ /* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex); LWLockWaitListUnlock(lock);
/* /*
* Awaken any waiters I removed from the queue. * Awaken any waiters I removed from the queue.
...@@ -1804,21 +1845,15 @@ LWLockRelease(LWLock *lock) ...@@ -1804,21 +1845,15 @@ LWLockRelease(LWLock *lock)
void void
LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val) LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
{ {
#ifdef LWLOCK_STATS LWLockWaitListLock(lock);
lwlock_stats *lwstats;
lwstats = get_lwlock_stats_entry(lock);
lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else
SpinLockAcquire(&lock->mutex);
#endif
/* /*
* Set the variable's value before releasing the lock, that prevents race * Set the variable's value before releasing the lock, that prevents race
* a race condition wherein a new locker acquires the lock, but hasn't yet * a race condition wherein a new locker acquires the lock, but hasn't yet
* set the variables value. * set the variables value.
*/ */
*valptr = val; *valptr = val;
SpinLockRelease(&lock->mutex); LWLockWaitListUnlock(lock);
LWLockRelease(lock); LWLockRelease(lock);
} }
......
...@@ -57,15 +57,11 @@ typedef struct LWLockTranche ...@@ -57,15 +57,11 @@ typedef struct LWLockTranche
*/ */
typedef struct LWLock typedef struct LWLock
{ {
slock_t mutex; /* Protects LWLock and queue of PGPROCs */
uint16 tranche; /* tranche ID */ uint16 tranche; /* tranche ID */
pg_atomic_uint32 state; /* state of exclusive/nonexclusive lockers */ pg_atomic_uint32 state; /* state of exclusive/nonexclusive lockers */
#ifdef LOCK_DEBUG
pg_atomic_uint32 nwaiters; /* number of waiters */
#endif
dlist_head waiters; /* list of waiting PGPROCs */ dlist_head waiters; /* list of waiting PGPROCs */
#ifdef LOCK_DEBUG #ifdef LOCK_DEBUG
pg_atomic_uint32 nwaiters; /* number of waiters */
struct PGPROC *owner; /* last exclusive owner of the lock */ struct PGPROC *owner; /* last exclusive owner of the lock */
#endif #endif
} LWLock; } LWLock;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment