Commit c00dc337 authored by Tom Lane's avatar Tom Lane

Fix potential corruption of lock table in CREATE/DROP INDEX CONCURRENTLY.

If VirtualXactLock() has to wait for a transaction that holds its VXID lock
as a fast-path lock, it must first convert the fast-path lock to a regular
lock.  It failed to take the required "partition" lock on the main
shared-memory lock table while doing so.  This is the direct cause of the
assert failure in GetLockStatusData() recently observed in the buildfarm,
but more worryingly it could result in arbitrary corruption of the shared
lock table if some other process were concurrently engaged in modifying the
same partition of the lock table.  Fortunately, VirtualXactLock() is only
used by CREATE INDEX CONCURRENTLY and DROP INDEX CONCURRENTLY, so the
opportunities for failure are fewer than they might have been.

In passing, improve some comments and be a bit more consistent about
order of operations.
parent f31d5baf
...@@ -1017,6 +1017,12 @@ LockAcquireExtended(const LOCKTAG *locktag, ...@@ -1017,6 +1017,12 @@ LockAcquireExtended(const LOCKTAG *locktag,
/* /*
* Find or create LOCK and PROCLOCK objects as needed for a new lock * Find or create LOCK and PROCLOCK objects as needed for a new lock
* request. * request.
*
* Returns the PROCLOCK object, or NULL if we failed to create the objects
* for lack of shared memory.
*
* The appropriate partition lock must be held at entry, and will be
* held at exit.
*/ */
static PROCLOCK * static PROCLOCK *
SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
...@@ -2414,6 +2420,8 @@ FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode) ...@@ -2414,6 +2420,8 @@ FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode)
* FastPathTransferRelationLocks * FastPathTransferRelationLocks
* Transfer locks matching the given lock tag from per-backend fast-path * Transfer locks matching the given lock tag from per-backend fast-path
* arrays to the shared hash table. * arrays to the shared hash table.
*
* Returns true if successful, false if ran out of shared memory.
*/ */
static bool static bool
FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag, FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
...@@ -2529,6 +2537,7 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock) ...@@ -2529,6 +2537,7 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock)
locallock->hashcode, lockmode); locallock->hashcode, lockmode);
if (!proclock) if (!proclock)
{ {
LWLockRelease(partitionLock);
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"), errmsg("out of shared memory"),
...@@ -3422,9 +3431,6 @@ GetRunningTransactionLocks(int *nlocks) ...@@ -3422,9 +3431,6 @@ GetRunningTransactionLocks(int *nlocks)
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
LWLockAcquire(FirstLockMgrLock + i, LW_SHARED); LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
/* Now scan the tables to copy the data */
hash_seq_init(&seqstat, LockMethodProcLockHash);
/* Now we can safely count the number of proclocks */ /* Now we can safely count the number of proclocks */
els = hash_get_num_entries(LockMethodProcLockHash); els = hash_get_num_entries(LockMethodProcLockHash);
...@@ -3434,6 +3440,9 @@ GetRunningTransactionLocks(int *nlocks) ...@@ -3434,6 +3440,9 @@ GetRunningTransactionLocks(int *nlocks)
*/ */
accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock)); accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));
/* Now scan the tables to copy the data */
hash_seq_init(&seqstat, LockMethodProcLockHash);
/* /*
* If lock is a currently granted AccessExclusiveLock then it will have * If lock is a currently granted AccessExclusiveLock then it will have
* just one proclock holder, so locks are never accessed twice in this * just one proclock holder, so locks are never accessed twice in this
...@@ -3978,22 +3987,34 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) ...@@ -3978,22 +3987,34 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait)
/* /*
* OK, we're going to need to sleep on the VXID. But first, we must set * OK, we're going to need to sleep on the VXID. But first, we must set
* up the primary lock table entry, if needed. * up the primary lock table entry, if needed (ie, convert the proc's
* fast-path lock on its VXID to a regular lock).
*/ */
if (proc->fpVXIDLock) if (proc->fpVXIDLock)
{ {
PROCLOCK *proclock; PROCLOCK *proclock;
uint32 hashcode; uint32 hashcode;
LWLockId partitionLock;
hashcode = LockTagHashCode(&tag); hashcode = LockTagHashCode(&tag);
partitionLock = LockHashPartitionLock(hashcode);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc, proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
&tag, hashcode, ExclusiveLock); &tag, hashcode, ExclusiveLock);
if (!proclock) if (!proclock)
{
LWLockRelease(partitionLock);
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY), (errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of shared memory"), errmsg("out of shared memory"),
errhint("You might need to increase max_locks_per_transaction."))); errhint("You might need to increase max_locks_per_transaction.")));
}
GrantLock(proclock->tag.myLock, proclock, ExclusiveLock); GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);
LWLockRelease(partitionLock);
proc->fpVXIDLock = false; proc->fpVXIDLock = false;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment