Commit 70397601 authored by Andres Freund's avatar Andres Freund

Fix issues around the "variable" support in the lwlock infrastructure.

The lwlock scalability work introduced two race conditions into the
lwlock variable support provided for xlog.c. First, and harmlessly on
most platforms, it set/read the variable without the spinlock in some
places. Secondly, due to the removal of the spinlock, it was possible
that a backend missed changes to the variable's state if it changed in
the wrong moment because checking the lock's state, the variable's state
and the queuing are not protected by a single spinlock acquisition
anymore.

To fix first move resetting the variable's from LWLockAcquireWithVar to
WALInsertLockRelease, via a new function LWLockReleaseClearVar. That
prevents issues around waiting for a variable's value to change when a
new locker has acquired the lock, but not yet set the value. Secondly
re-check that the variable hasn't changed after enqueing, that prevents
the issue that the lock has been released and already re-acquired by the
time the woken up backend checks for the lock's state.

Reported-By: Jeff Janes
Analyzed-By: Heikki Linnakangas
Reviewed-By: Heikki Linnakangas
Discussion: 5592DB35.2060401@iki.fi
Backpatch: 9.5, where the lwlock scalability went in
parent f69b4b94
...@@ -1408,9 +1408,7 @@ WALInsertLockAcquire(void) ...@@ -1408,9 +1408,7 @@ WALInsertLockAcquire(void)
* The insertingAt value is initially set to 0, as we don't know our * The insertingAt value is initially set to 0, as we don't know our
* insert location yet. * insert location yet.
*/ */
immed = LWLockAcquireWithVar(&WALInsertLocks[MyLockNo].l.lock, immed = LWLockAcquire(&WALInsertLocks[MyLockNo].l.lock, LW_EXCLUSIVE);
&WALInsertLocks[MyLockNo].l.insertingAt,
0);
if (!immed) if (!immed)
{ {
/* /*
...@@ -1435,26 +1433,28 @@ WALInsertLockAcquireExclusive(void) ...@@ -1435,26 +1433,28 @@ WALInsertLockAcquireExclusive(void)
int i; int i;
/* /*
* When holding all the locks, we only update the last lock's insertingAt * When holding all the locks, all but the last lock's insertingAt
* indicator. The others are set to 0xFFFFFFFFFFFFFFFF, which is higher * indicator is set to 0xFFFFFFFFFFFFFFFF, which is higher than any real
* than any real XLogRecPtr value, to make sure that no-one blocks waiting * XLogRecPtr value, to make sure that no-one blocks waiting on those.
* on those.
*/ */
for (i = 0; i < NUM_XLOGINSERT_LOCKS - 1; i++) for (i = 0; i < NUM_XLOGINSERT_LOCKS - 1; i++)
{ {
LWLockAcquireWithVar(&WALInsertLocks[i].l.lock, LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
&WALInsertLocks[i].l.insertingAt, LWLockUpdateVar(&WALInsertLocks[i].l.lock,
PG_UINT64_MAX); &WALInsertLocks[i].l.insertingAt,
PG_UINT64_MAX);
} }
LWLockAcquireWithVar(&WALInsertLocks[i].l.lock, /* Variable value reset to 0 at release */
&WALInsertLocks[i].l.insertingAt, LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
0);
holdingAllLocks = true; holdingAllLocks = true;
} }
/* /*
* Release our insertion lock (or locks, if we're holding them all). * Release our insertion lock (or locks, if we're holding them all).
*
* NB: Reset all variables to 0, so they cause LWLockWaitForVar to block the
* next time the lock is acquired.
*/ */
static void static void
WALInsertLockRelease(void) WALInsertLockRelease(void)
...@@ -1464,13 +1464,17 @@ WALInsertLockRelease(void) ...@@ -1464,13 +1464,17 @@ WALInsertLockRelease(void)
int i; int i;
for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++) for (i = 0; i < NUM_XLOGINSERT_LOCKS; i++)
LWLockRelease(&WALInsertLocks[i].l.lock); LWLockReleaseClearVar(&WALInsertLocks[i].l.lock,
&WALInsertLocks[i].l.insertingAt,
0);
holdingAllLocks = false; holdingAllLocks = false;
} }
else else
{ {
LWLockRelease(&WALInsertLocks[MyLockNo].l.lock); LWLockReleaseClearVar(&WALInsertLocks[MyLockNo].l.lock,
&WALInsertLocks[MyLockNo].l.insertingAt,
0);
} }
} }
......
...@@ -10,13 +10,15 @@ ...@@ -10,13 +10,15 @@
* locking should be done with the full lock manager --- which depends on * locking should be done with the full lock manager --- which depends on
* LWLocks to protect its shared state. * LWLocks to protect its shared state.
* *
* In addition to exclusive and shared modes, lightweight locks can be used * In addition to exclusive and shared modes, lightweight locks can be used to
* to wait until a variable changes value. The variable is initially set * wait until a variable changes value. The variable is initially not set
* when the lock is acquired with LWLockAcquireWithVar, and can be updated * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
* value it was set to when the lock was released last, and can be updated
* without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
* waits for the variable to be updated, or until the lock is free. The * waits for the variable to be updated, or until the lock is free. When
* meaning of the variable is up to the caller, the lightweight lock code * releasing the lock with LWLockReleaseClearVar() the value can be set to an
* just assigns and compares it. * appropriate value for a free lock. The meaning of the variable is up to
* the caller, the lightweight lock code just assigns and compares it.
* *
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
...@@ -150,9 +152,6 @@ static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS]; ...@@ -150,9 +152,6 @@ static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
static int lock_addin_request = 0; static int lock_addin_request = 0;
static bool lock_addin_request_allowed = true; static bool lock_addin_request_allowed = true;
static inline bool LWLockAcquireCommon(LWLock *l, LWLockMode mode,
uint64 *valptr, uint64 val);
#ifdef LWLOCK_STATS #ifdef LWLOCK_STATS
typedef struct lwlock_stats_key typedef struct lwlock_stats_key
{ {
...@@ -899,25 +898,7 @@ LWLockDequeueSelf(LWLock *lock) ...@@ -899,25 +898,7 @@ LWLockDequeueSelf(LWLock *lock)
* Side effect: cancel/die interrupts are held off until lock release. * Side effect: cancel/die interrupts are held off until lock release.
*/ */
bool bool
LWLockAcquire(LWLock *l, LWLockMode mode) LWLockAcquire(LWLock *lock, LWLockMode mode)
{
return LWLockAcquireCommon(l, mode, NULL, 0);
}
/*
* LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val
*
* The lock is always acquired in exclusive mode with this function.
*/
bool
LWLockAcquireWithVar(LWLock *l, uint64 *valptr, uint64 val)
{
return LWLockAcquireCommon(l, LW_EXCLUSIVE, valptr, val);
}
/* internal function to implement LWLockAcquire and LWLockAcquireWithVar */
static inline bool
LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
{ {
PGPROC *proc = MyProc; PGPROC *proc = MyProc;
bool result = true; bool result = true;
...@@ -1064,10 +1045,6 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val) ...@@ -1064,10 +1045,6 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
result = false; result = false;
} }
/* If there's a variable associated with this lock, initialize it */
if (valptr)
*valptr = val;
TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode); TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode);
/* Add lock to list of locks held by this backend */ /* Add lock to list of locks held by this backend */
...@@ -1258,6 +1235,71 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) ...@@ -1258,6 +1235,71 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
return !mustwait; return !mustwait;
} }
/*
* Does the lwlock in its current state need to wait for the variable value to
* change?
*
* If we don't need to wait, and it's because the value of the variable has
* changed, store the current value in newval.
*
* *result is set to true if the lock was free, and false otherwise.
*/
static bool
LWLockConflictsWithVar(LWLock *lock,
uint64 *valptr, uint64 oldval, uint64 *newval,
bool *result)
{
bool mustwait;
uint64 value;
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
lwstats = get_lwlock_stats_entry(lock);
#endif
/*
* Test first to see if it the slot is free right now.
*
* XXX: the caller uses a spinlock before this, so we don't need a memory
* barrier here as far as the current usage is concerned. But that might
* not be safe in general.
*/
mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
if (!mustwait)
{
*result = true;
return false;
}
*result = false;
/*
* Read value using spinlock as we can't rely on atomic 64 bit
* reads/stores. TODO: On platforms with a way to do atomic 64 bit
* reads/writes the spinlock could be optimized away.
*/
#ifdef LWLOCK_STATS
lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else
SpinLockAcquire(&lock->mutex);
#endif
value = *valptr;
SpinLockRelease(&lock->mutex);
if (value != oldval)
{
mustwait = false;
*newval = value;
}
else
{
mustwait = true;
}
return mustwait;
}
/* /*
* LWLockWaitForVar - Wait until lock is free, or a variable is updated. * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
* *
...@@ -1268,11 +1310,6 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) ...@@ -1268,11 +1310,6 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
* matches oldval, returns false and sets *newval to the current value in * matches oldval, returns false and sets *newval to the current value in
* *valptr. * *valptr.
* *
* It's possible that the lock holder releases the lock, but another backend
* acquires it again before we get a chance to observe that the lock was
* momentarily released. We wouldn't need to wait for the new lock holder,
* but we cannot distinguish that case, so we will have to wait.
*
* Note: this function ignores shared lock holders; if the lock is held * Note: this function ignores shared lock holders; if the lock is held
* in shared mode, returns 'true'. * in shared mode, returns 'true'.
*/ */
...@@ -1290,16 +1327,6 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) ...@@ -1290,16 +1327,6 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE); PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
/*
* Quick test first to see if it the slot is free right now.
*
* XXX: the caller uses a spinlock before this, so we don't need a memory
* barrier here as far as the current usage is concerned. But that might
* not be safe in general.
*/
if ((pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) == 0)
return true;
/* /*
* Lock out cancel/die interrupts while we sleep on the lock. There is no * Lock out cancel/die interrupts while we sleep on the lock. There is no
* cleanup mechanism to remove us from the wait queue if we got * cleanup mechanism to remove us from the wait queue if we got
...@@ -1313,39 +1340,9 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) ...@@ -1313,39 +1340,9 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
for (;;) for (;;)
{ {
bool mustwait; bool mustwait;
uint64 value;
mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
if (mustwait)
{
/*
* Perform comparison using spinlock as we can't rely on atomic 64
* bit reads/stores.
*/
#ifdef LWLOCK_STATS
lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else
SpinLockAcquire(&lock->mutex);
#endif
/* mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
* XXX: We can significantly optimize this on platforms with 64bit &result);
* atomics.
*/
value = *valptr;
if (value != oldval)
{
result = false;
mustwait = false;
*newval = value;
}
else
mustwait = true;
SpinLockRelease(&lock->mutex);
}
else
mustwait = false;
if (!mustwait) if (!mustwait)
break; /* the lock was free or value didn't match */ break; /* the lock was free or value didn't match */
...@@ -1354,7 +1351,9 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) ...@@ -1354,7 +1351,9 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
* Add myself to wait queue. Note that this is racy, somebody else * Add myself to wait queue. Note that this is racy, somebody else
* could wakeup before we're finished queuing. NB: We're using nearly * could wakeup before we're finished queuing. NB: We're using nearly
* the same twice-in-a-row lock acquisition protocol as * the same twice-in-a-row lock acquisition protocol as
* LWLockAcquire(). Check its comments for details. * LWLockAcquire(). Check its comments for details. The only
* difference is that we also have to check the variable's values when
* checking the state of the lock.
*/ */
LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
...@@ -1365,12 +1364,13 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval) ...@@ -1365,12 +1364,13 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK); pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
/* /*
* We're now guaranteed to be woken up if necessary. Recheck the * We're now guaranteed to be woken up if necessary. Recheck the lock
* lock's state. * and variables state.
*/ */
mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0; mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
&result);
/* Ok, lock is free after we queued ourselves. Undo queueing. */ /* Ok, no conflict after we queued ourselves. Undo queueing. */
if (!mustwait) if (!mustwait)
{ {
LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue"); LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
...@@ -1587,6 +1587,31 @@ LWLockRelease(LWLock *lock) ...@@ -1587,6 +1587,31 @@ LWLockRelease(LWLock *lock)
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
} }
/*
* LWLockReleaseClearVar - release a previously acquired lock, reset variable
*/
void
LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
{
#ifdef LWLOCK_STATS
lwlock_stats *lwstats;
lwstats = get_lwlock_stats_entry(lock);
lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
#else
SpinLockAcquire(&lock->mutex);
#endif
/*
* Set the variable's value before releasing the lock, that prevents race
* a race condition wherein a new locker acquires the lock, but hasn't yet
* set the variables value.
*/
*valptr = val;
SpinLockRelease(&lock->mutex);
LWLockRelease(lock);
}
/* /*
* LWLockReleaseAll - release all currently-held locks * LWLockReleaseAll - release all currently-held locks
......
...@@ -182,10 +182,10 @@ extern bool LWLockAcquire(LWLock *lock, LWLockMode mode); ...@@ -182,10 +182,10 @@ extern bool LWLockAcquire(LWLock *lock, LWLockMode mode);
extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode); extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode);
extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode); extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
extern void LWLockRelease(LWLock *lock); extern void LWLockRelease(LWLock *lock);
extern void LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val);
extern void LWLockReleaseAll(void); extern void LWLockReleaseAll(void);
extern bool LWLockHeldByMe(LWLock *lock); extern bool LWLockHeldByMe(LWLock *lock);
extern bool LWLockAcquireWithVar(LWLock *lock, uint64 *valptr, uint64 val);
extern bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval); extern bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval);
extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 value); extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 value);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment