Commit 07ab1383 authored by Robert Haas's avatar Robert Haas

Fix two more bugs in fast-path relation locking.

First, the previous code failed to account for the fact that, during Hot
Standby operation, the startup process takes AccessExclusiveLocks on
relations without setting MyDatabaseId.  This resulted in fast path
strong lock counts failing to be incremented with the startup process
took locks, which in turn allowed conflicting lock requests to succeed
when they should not have.  Report by Erik Rijkers, diagnosis by Heikki
Linnakangas.

Second, LockReleaseAll() failed to honor the allLocks and lockmethodid
restrictions with respect to fast-path locks.  It's not clear to me
whether this produces any user-visible breakage at the moment, but it's
certainly wrong.  Rearrange order of operations in LockReleaseAll to fix.
Noted by Tom Lane.
parent 932ded2e
......@@ -192,14 +192,17 @@ static int FastPathLocalUseCount = 0;
* self-conflicting, it can't use the fast-path mechanism; but it also does
* not conflict with any of the locks that do, so we can ignore it completely.
*/
#define FastPathTag(locktag) \
#define EligibleForRelationFastPath(locktag, mode) \
((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
(locktag)->locktag_type == LOCKTAG_RELATION && \
(locktag)->locktag_field1 == MyDatabaseId && \
MyDatabaseId != InvalidOid)
#define FastPathWeakMode(mode) ((mode) < ShareUpdateExclusiveLock)
#define FastPathStrongMode(mode) ((mode) > ShareUpdateExclusiveLock)
#define FastPathRelevantMode(mode) ((mode) != ShareUpdateExclusiveLock)
MyDatabaseId != InvalidOid && \
(mode) < ShareUpdateExclusiveLock)
#define ConflictsWithRelationFastPath(locktag, mode) \
((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
(locktag)->locktag_type == LOCKTAG_RELATION && \
(locktag)->locktag_field1 != InvalidOid && \
(mode) > ShareUpdateExclusiveLock)
static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode);
static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
......@@ -697,26 +700,20 @@ LockAcquireExtended(const LOCKTAG *locktag,
log_lock = true;
}
/* Locks that participate in the fast path require special handling. */
if (FastPathTag(locktag) && FastPathRelevantMode(lockmode))
{
uint32 fasthashcode;
fasthashcode = FastPathStrongLockHashPartition(hashcode);
/*
* If we remember having filled up the fast path array, we don't
* attempt to make any further use of it until we release some locks.
* It's possible that some other backend has transferred some of those
* locks to the shared hash table, leaving space free, but it's not
* worth acquiring the LWLock just to check. It's also possible that
* we're acquiring a second or third lock type on a relation we have
* already locked using the fast-path, but for now we don't worry about
* that case either.
* Attempt to take lock via fast path, if eligible. But if we remember
* having filled up the fast path array, we don't attempt to make any
* further use of it until we release some locks. It's possible that some
* other backend has transferred some of those locks to the shared hash
* table, leaving space free, but it's not worth acquiring the LWLock just
* to check. It's also possible that we're acquiring a second or third
* lock type on a relation we have already locked using the fast-path, but
* for now we don't worry about that case either.
*/
if (FastPathWeakMode(lockmode)
if (EligibleForRelationFastPath(locktag, lockmode)
&& FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
{
uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
bool acquired;
/*
......@@ -738,8 +735,17 @@ LockAcquireExtended(const LOCKTAG *locktag,
return LOCKACQUIRE_OK;
}
}
else if (FastPathStrongMode(lockmode))
/*
* If this lock could potentially have been taken via the fast-path by
* some other backend, we must (temporarily) disable further use of the
* fast-path for this lock tag, and migrate any locks already taken via
* this method to the main lock table.
*/
if (ConflictsWithRelationFastPath(locktag, lockmode))
{
uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
BeginStrongLockAcquire(locallock, fasthashcode);
if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
hashcode))
......@@ -754,10 +760,11 @@ LockAcquireExtended(const LOCKTAG *locktag,
return LOCKACQUIRE_NOT_AVAIL;
}
}
}
/*
* Otherwise we've got to mess with the shared lock table.
* We didn't find the lock in our LOCALLOCK table, and we didn't manage
* to take it via the fast-path, either, so we've got to mess with the
* shared lock table.
*/
partitionLock = LockHashPartitionLock(hashcode);
......@@ -1688,8 +1695,8 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
if (locallock->nLocks > 0)
return TRUE;
/* Locks that participate in the fast path require special handling. */
if (FastPathTag(locktag) && FastPathWeakMode(lockmode)
/* Attempt fast release of any lock eligible for the fast path. */
if (EligibleForRelationFastPath(locktag, lockmode)
&& FastPathLocalUseCount > 0)
{
bool released;
......@@ -1729,7 +1736,7 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
PROCLOCKTAG proclocktag;
bool found;
Assert(FastPathTag(locktag) && FastPathWeakMode(lockmode));
Assert(EligibleForRelationFastPath(locktag, lockmode));
lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
(const void *) locktag,
locallock->hashcode,
......@@ -1830,11 +1837,6 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
{
if (locallock->proclock == NULL || locallock->lock == NULL)
{
LOCKMODE lockmode = locallock->tag.mode;
Oid relid;
/*
* If the LOCALLOCK entry is unused, we must've run out of shared
* memory while trying to set up this lock. Just forget the local
......@@ -1846,12 +1848,52 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
continue;
}
/* Ignore items that are not of the lockmethod to be removed */
if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
continue;
/*
* If we are asked to release all locks, we can just zap the entry.
* Otherwise, must scan to see if there are session locks. We assume
* there is at most one lockOwners entry for session locks.
*/
if (!allLocks)
{
LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
/* If it's above array position 0, move it down to 0 */
for (i = locallock->numLockOwners - 1; i > 0; i--)
{
if (lockOwners[i].owner == NULL)
{
lockOwners[0] = lockOwners[i];
break;
}
}
if (locallock->numLockOwners > 0 &&
lockOwners[0].owner == NULL &&
lockOwners[0].nLocks > 0)
{
/* Fix the locallock to show just the session locks */
locallock->nLocks = lockOwners[0].nLocks;
locallock->numLockOwners = 1;
/* We aren't deleting this locallock, so done */
continue;
}
}
/*
* Otherwise, we should be dealing with a lock acquired via the
* fast-path. If not, we've got trouble.
* If the lock or proclock pointers are NULL, this lock was taken via
* the relation fast-path.
*/
if (!FastPathTag(&locallock->tag.lock)
|| !FastPathWeakMode(lockmode))
if (locallock->proclock == NULL || locallock->lock == NULL)
{
LOCKMODE lockmode = locallock->tag.mode;
Oid relid;
/* Verify that a fast-path lock is what we've got. */
if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
elog(PANIC, "locallock table corrupted");
/*
......@@ -1894,41 +1936,6 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
continue;
}
/* Ignore items that are not of the lockmethod to be removed */
if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
continue;
/*
* If we are asked to release all locks, we can just zap the entry.
* Otherwise, must scan to see if there are session locks. We assume
* there is at most one lockOwners entry for session locks.
*/
if (!allLocks)
{
LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
/* If it's above array position 0, move it down to 0 */
for (i = locallock->numLockOwners - 1; i > 0; i--)
{
if (lockOwners[i].owner == NULL)
{
lockOwners[0] = lockOwners[i];
break;
}
}
if (locallock->numLockOwners > 0 &&
lockOwners[0].owner == NULL &&
lockOwners[0].nLocks > 0)
{
/* Fix the locallock to show just the session locks */
locallock->nLocks = lockOwners[0].nLocks;
locallock->numLockOwners = 1;
/* We aren't deleting this locallock, so done */
continue;
}
}
/* Mark the proclock to show we need to release this lockmode */
if (locallock->nLocks > 0)
locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
......@@ -2481,10 +2488,10 @@ GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
/*
* Fast path locks might not have been entered in the primary lock table.
* But only strong locks can conflict with anything that might have been
* taken via the fast-path mechanism.
* If the lock we're dealing with could conflict with such a lock, we must
* examine each backend's fast-path array for conflicts.
*/
if (FastPathTag(locktag) && FastPathStrongMode(lockmode))
if (ConflictsWithRelationFastPath(locktag, lockmode))
{
int i;
Oid relid = locktag->locktag_field2;
......@@ -2722,7 +2729,7 @@ LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
* Decrement strong lock count. This logic is needed only for 2PC.
*/
if (decrement_strong_lock_count
&& FastPathTag(&lock->tag) && FastPathStrongMode(lockmode))
&& ConflictsWithRelationFastPath(&lock->tag, lockmode))
{
uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
......@@ -3604,7 +3611,7 @@ lock_twophase_recover(TransactionId xid, uint16 info,
* Bump strong lock count, to make sure any fast-path lock requests won't
* be granted without consulting the primary lock table.
*/
if (FastPathTag(&lock->tag) && FastPathStrongMode(lockmode))
if (ConflictsWithRelationFastPath(&lock->tag, lockmode))
{
uint32 fasthashcode = FastPathStrongLockHashPartition(hashcode);
SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment