Commit 53c5b869 authored by Robert Haas's avatar Robert Haas

Tighten up error recovery for fast-path locking.

The previous code could cause a backend crash after BEGIN; SAVEPOINT a;
LOCK TABLE foo (interrupted by ^C or statement timeout); ROLLBACK TO
SAVEPOINT a; LOCK TABLE foo, and might have leaked strong-lock counts
in other situations.

Report by Zoltán Böszörményi; patch review by Jeff Davis.
parent ab77b2da
...@@ -2259,7 +2259,7 @@ AbortTransaction(void) ...@@ -2259,7 +2259,7 @@ AbortTransaction(void)
* Also clean up any open wait for lock, since the lock manager will choke * Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this. * if we try to wait for another lock before doing this.
*/ */
LockWaitCancel(); LockErrorCleanup();
/* /*
* check the current transaction state * check the current transaction state
...@@ -4144,7 +4144,7 @@ AbortSubTransaction(void) ...@@ -4144,7 +4144,7 @@ AbortSubTransaction(void)
AbortBufferIO(); AbortBufferIO();
UnlockBuffers(); UnlockBuffers();
LockWaitCancel(); LockErrorCleanup();
/* /*
* check the current transaction state * check the current transaction state
......
...@@ -560,7 +560,7 @@ examines every member of the wait queue (this was not true in the 7.0 ...@@ -560,7 +560,7 @@ examines every member of the wait queue (this was not true in the 7.0
implementation, BTW). Therefore, if ProcLockWakeup is always invoked implementation, BTW). Therefore, if ProcLockWakeup is always invoked
after a lock is released or a wait queue is rearranged, there can be no after a lock is released or a wait queue is rearranged, there can be no
failure to wake a wakable process. One should also note that failure to wake a wakable process. One should also note that
LockWaitCancel (abort a waiter due to outside factors) must run LockErrorCleanup (abort a waiter due to outside factors) must run
ProcLockWakeup, in case the canceled waiter was soft-blocking other ProcLockWakeup, in case the canceled waiter was soft-blocking other
waiters. waiters.
......
...@@ -250,7 +250,8 @@ static HTAB *LockMethodProcLockHash; ...@@ -250,7 +250,8 @@ static HTAB *LockMethodProcLockHash;
static HTAB *LockMethodLocalHash; static HTAB *LockMethodLocalHash;
/* private state for GrantAwaitedLock */ /* private state for error cleanup */
static LOCALLOCK *StrongLockInProgress;
static LOCALLOCK *awaitedLock; static LOCALLOCK *awaitedLock;
static ResourceOwner awaitedOwner; static ResourceOwner awaitedOwner;
...@@ -338,6 +339,8 @@ static void RemoveLocalLock(LOCALLOCK *locallock); ...@@ -338,6 +339,8 @@ static void RemoveLocalLock(LOCALLOCK *locallock);
static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode); const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
static void FinishStrongLockAcquire(void);
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner); static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
static void ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner); static void ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
...@@ -738,22 +741,11 @@ LockAcquireExtended(const LOCKTAG *locktag, ...@@ -738,22 +741,11 @@ LockAcquireExtended(const LOCKTAG *locktag,
} }
else if (FastPathStrongMode(lockmode)) else if (FastPathStrongMode(lockmode))
{ {
/* BeginStrongLockAcquire(locallock, fasthashcode);
* Adding to a memory location is not atomic, so we take a
* spinlock to ensure we don't collide with someone else trying
* to bump the count at the same time.
*
* XXX: It might be worth considering using an atomic fetch-and-add
* instruction here, on architectures where that is supported.
*/
Assert(locallock->holdsStrongLockCount == FALSE);
SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
FastPathStrongRelationLocks->count[fasthashcode]++;
locallock->holdsStrongLockCount = TRUE;
SpinLockRelease(&FastPathStrongRelationLocks->mutex);
if (!FastPathTransferRelationLocks(lockMethodTable, locktag, if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
hashcode)) hashcode))
{ {
AbortStrongLockAcquire();
if (reportMemoryError) if (reportMemoryError)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
...@@ -779,6 +771,7 @@ LockAcquireExtended(const LOCKTAG *locktag, ...@@ -779,6 +771,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
hashcode, lockmode); hashcode, lockmode);
if (!proclock) if (!proclock)
{ {
AbortStrongLockAcquire();
LWLockRelease(partitionLock); LWLockRelease(partitionLock);
if (reportMemoryError) if (reportMemoryError)
ereport(ERROR, ereport(ERROR,
...@@ -820,6 +813,7 @@ LockAcquireExtended(const LOCKTAG *locktag, ...@@ -820,6 +813,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
*/ */
if (dontWait) if (dontWait)
{ {
AbortStrongLockAcquire();
if (proclock->holdMask == 0) if (proclock->holdMask == 0)
{ {
uint32 proclock_hashcode; uint32 proclock_hashcode;
...@@ -884,6 +878,7 @@ LockAcquireExtended(const LOCKTAG *locktag, ...@@ -884,6 +878,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
*/ */
if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{ {
AbortStrongLockAcquire();
PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */ /* Should we retry ? */
...@@ -894,6 +889,12 @@ LockAcquireExtended(const LOCKTAG *locktag, ...@@ -894,6 +889,12 @@ LockAcquireExtended(const LOCKTAG *locktag,
LOCK_PRINT("LockAcquire: granted", lock, lockmode); LOCK_PRINT("LockAcquire: granted", lock, lockmode);
} }
/*
* Lock state is fully up-to-date now; if we error out after this, no
* special error cleanup is required.
*/
FinishStrongLockAcquire();
LWLockRelease(partitionLock); LWLockRelease(partitionLock);
/* /*
...@@ -1349,6 +1350,64 @@ GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner) ...@@ -1349,6 +1350,64 @@ GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
locallock->numLockOwners++; locallock->numLockOwners++;
} }
/*
* BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
* and arrange for error cleanup if it fails
*/
static void
BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
{
Assert(StrongLockInProgress == NULL);
Assert(locallock->holdsStrongLockCount == FALSE);
/*
* Adding to a memory location is not atomic, so we take a
* spinlock to ensure we don't collide with someone else trying
* to bump the count at the same time.
*
* XXX: It might be worth considering using an atomic fetch-and-add
* instruction here, on architectures where that is supported.
*/
SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
FastPathStrongRelationLocks->count[fasthashcode]++;
locallock->holdsStrongLockCount = TRUE;
StrongLockInProgress = locallock;
SpinLockRelease(&FastPathStrongRelationLocks->mutex);
}
/*
* FinishStrongLockAcquire - cancel pending cleanup for a strong lock
* acquisition once it's no longer needed
*/
static void
FinishStrongLockAcquire(void)
{
StrongLockInProgress = NULL;
}
/*
* AbortStrongLockAcquire - undo strong lock state changes performed by
* BeginStrongLockAcquire.
*/
void
AbortStrongLockAcquire(void)
{
uint32 fasthashcode;
LOCALLOCK *locallock = StrongLockInProgress;
if (locallock == NULL)
return;
fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
Assert(locallock->holdsStrongLockCount == TRUE);
SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
FastPathStrongRelationLocks->count[fasthashcode]--;
locallock->holdsStrongLockCount = FALSE;
StrongLockInProgress = NULL;
SpinLockRelease(&FastPathStrongRelationLocks->mutex);
}
/* /*
* GrantAwaitedLock -- call GrantLockLocal for the lock we are doing * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
* WaitOnLock on. * WaitOnLock on.
...@@ -1414,7 +1473,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) ...@@ -1414,7 +1473,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
* We can and do use a PG_TRY block to try to clean up after failure, but * We can and do use a PG_TRY block to try to clean up after failure, but
* this still has a major limitation: elog(FATAL) can occur while waiting * this still has a major limitation: elog(FATAL) can occur while waiting
* (eg, a "die" interrupt), and then control won't come back here. So all * (eg, a "die" interrupt), and then control won't come back here. So all
* cleanup of essential state should happen in LockWaitCancel, not here. * cleanup of essential state should happen in LockErrorCleanup, not here.
* We can use PG_TRY to clear the "waiting" status flags, since doing that * We can use PG_TRY to clear the "waiting" status flags, since doing that
* is unimportant if the process exits. * is unimportant if the process exits.
*/ */
...@@ -1441,7 +1500,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) ...@@ -1441,7 +1500,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
} }
PG_CATCH(); PG_CATCH();
{ {
/* In this path, awaitedLock remains set until LockWaitCancel */ /* In this path, awaitedLock remains set until LockErrorCleanup */
/* Report change to non-waiting status */ /* Report change to non-waiting status */
pgstat_report_waiting(false); pgstat_report_waiting(false);
......
...@@ -635,17 +635,20 @@ IsWaitingForLock(void) ...@@ -635,17 +635,20 @@ IsWaitingForLock(void)
} }
/* /*
* Cancel any pending wait for lock, when aborting a transaction. * Cancel any pending wait for lock, when aborting a transaction, and revert
* any strong lock count acquisition for a lock being acquired.
* *
* (Normally, this would only happen if we accept a cancel/die * (Normally, this would only happen if we accept a cancel/die
* interrupt while waiting; but an ereport(ERROR) while waiting is * interrupt while waiting; but an ereport(ERROR) before or during the lock
* within the realm of possibility, too.) * wait is within the realm of possibility, too.)
*/ */
void void
LockWaitCancel(void) LockErrorCleanup(void)
{ {
LWLockId partitionLock; LWLockId partitionLock;
AbortStrongLockAcquire();
/* Nothing to do if we weren't waiting for a lock */ /* Nothing to do if we weren't waiting for a lock */
if (lockAwaited == NULL) if (lockAwaited == NULL)
return; return;
...@@ -709,7 +712,7 @@ ProcReleaseLocks(bool isCommit) ...@@ -709,7 +712,7 @@ ProcReleaseLocks(bool isCommit)
if (!MyProc) if (!MyProc)
return; return;
/* If waiting, get off wait queue (should only be needed after error) */ /* If waiting, get off wait queue (should only be needed after error) */
LockWaitCancel(); LockErrorCleanup();
/* Release locks */ /* Release locks */
LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit); LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit);
...@@ -1019,7 +1022,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) ...@@ -1019,7 +1022,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
* NOTE: this may also cause us to exit critical-section state, possibly * NOTE: this may also cause us to exit critical-section state, possibly
* allowing a cancel/die interrupt to be accepted. This is OK because we * allowing a cancel/die interrupt to be accepted. This is OK because we
* have recorded the fact that we are waiting for a lock, and so * have recorded the fact that we are waiting for a lock, and so
* LockWaitCancel will clean up if cancel/die happens. * LockErrorCleanup will clean up if cancel/die happens.
*/ */
LWLockRelease(partitionLock); LWLockRelease(partitionLock);
...@@ -1062,7 +1065,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) ...@@ -1062,7 +1065,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
* don't, because we have no shared-state-change work to do after being * don't, because we have no shared-state-change work to do after being
* granted the lock (the grantor did it all). We do have to worry about * granted the lock (the grantor did it all). We do have to worry about
* updating the locallock table, but if we lose control to an error, * updating the locallock table, but if we lose control to an error,
* LockWaitCancel will fix that up. * LockErrorCleanup will fix that up.
*/ */
do do
{ {
...@@ -1207,7 +1210,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) ...@@ -1207,7 +1210,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
LWLockAcquire(partitionLock, LW_EXCLUSIVE); LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/* /*
* We no longer want LockWaitCancel to do anything. * We no longer want LockErrorCleanup to do anything.
*/ */
lockAwaited = NULL; lockAwaited = NULL;
......
...@@ -2575,7 +2575,7 @@ die(SIGNAL_ARGS) ...@@ -2575,7 +2575,7 @@ die(SIGNAL_ARGS)
/* bump holdoff count to make ProcessInterrupts() a no-op */ /* bump holdoff count to make ProcessInterrupts() a no-op */
/* until we are done getting ready for it */ /* until we are done getting ready for it */
InterruptHoldoffCount++; InterruptHoldoffCount++;
LockWaitCancel(); /* prevent CheckDeadLock from running */ LockErrorCleanup(); /* prevent CheckDeadLock from running */
DisableNotifyInterrupt(); DisableNotifyInterrupt();
DisableCatchupInterrupt(); DisableCatchupInterrupt();
InterruptHoldoffCount--; InterruptHoldoffCount--;
...@@ -2617,7 +2617,7 @@ StatementCancelHandler(SIGNAL_ARGS) ...@@ -2617,7 +2617,7 @@ StatementCancelHandler(SIGNAL_ARGS)
/* bump holdoff count to make ProcessInterrupts() a no-op */ /* bump holdoff count to make ProcessInterrupts() a no-op */
/* until we are done getting ready for it */ /* until we are done getting ready for it */
InterruptHoldoffCount++; InterruptHoldoffCount++;
LockWaitCancel(); /* prevent CheckDeadLock from running */ LockErrorCleanup(); /* prevent CheckDeadLock from running */
DisableNotifyInterrupt(); DisableNotifyInterrupt();
DisableCatchupInterrupt(); DisableCatchupInterrupt();
InterruptHoldoffCount--; InterruptHoldoffCount--;
...@@ -2776,7 +2776,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason) ...@@ -2776,7 +2776,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
/* bump holdoff count to make ProcessInterrupts() a no-op */ /* bump holdoff count to make ProcessInterrupts() a no-op */
/* until we are done getting ready for it */ /* until we are done getting ready for it */
InterruptHoldoffCount++; InterruptHoldoffCount++;
LockWaitCancel(); /* prevent CheckDeadLock from running */ LockErrorCleanup(); /* prevent CheckDeadLock from running */
DisableNotifyInterrupt(); DisableNotifyInterrupt();
DisableCatchupInterrupt(); DisableCatchupInterrupt();
InterruptHoldoffCount--; InterruptHoldoffCount--;
......
...@@ -489,6 +489,7 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag, ...@@ -489,6 +489,7 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
bool sessionLock, bool sessionLock,
bool dontWait, bool dontWait,
bool report_memory_error); bool report_memory_error);
extern void AbortStrongLockAcquire(void);
extern bool LockRelease(const LOCKTAG *locktag, extern bool LockRelease(const LOCKTAG *locktag,
LOCKMODE lockmode, bool sessionLock); LOCKMODE lockmode, bool sessionLock);
extern void LockReleaseSession(LOCKMETHODID lockmethodid); extern void LockReleaseSession(LOCKMETHODID lockmethodid);
......
...@@ -244,7 +244,7 @@ extern int ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable); ...@@ -244,7 +244,7 @@ extern int ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable);
extern PGPROC *ProcWakeup(PGPROC *proc, int waitStatus); extern PGPROC *ProcWakeup(PGPROC *proc, int waitStatus);
extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock); extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
extern bool IsWaitingForLock(void); extern bool IsWaitingForLock(void);
extern void LockWaitCancel(void); extern void LockErrorCleanup(void);
extern void ProcWaitForSignal(void); extern void ProcWaitForSignal(void);
extern void ProcSendSignal(int pid); extern void ProcSendSignal(int pid);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment