Commit e06fda0a authored by Tom Lane's avatar Tom Lane

Add a function GetLockConflicts() to lock.c to report xacts holding

locks that would conflict with a specified lock request, without
actually trying to get that lock.  Use this instead of the former ad hoc
method of doing the first wait step in CREATE INDEX CONCURRENTLY.
Fixes problem with undetected deadlock and in many cases will allow the
index creation to proceed sooner than it otherwise could've.  Per
discussion with Greg Stark.
parent ca1fd0ea
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.147 2006/08/25 04:06:48 tgl Exp $ * $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.148 2006/08/27 19:14:34 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -119,8 +119,11 @@ DefineIndex(RangeVar *heapRelation, ...@@ -119,8 +119,11 @@ DefineIndex(RangeVar *heapRelation,
Datum reloptions; Datum reloptions;
IndexInfo *indexInfo; IndexInfo *indexInfo;
int numberOfAttributes; int numberOfAttributes;
List *old_xact_list;
ListCell *lc;
uint32 ixcnt; uint32 ixcnt;
LockRelId heaprelid; LockRelId heaprelid;
LOCKTAG heaplocktag;
Snapshot snapshot; Snapshot snapshot;
Relation pg_index; Relation pg_index;
HeapTuple indexTuple; HeapTuple indexTuple;
...@@ -466,33 +469,26 @@ DefineIndex(RangeVar *heapRelation, ...@@ -466,33 +469,26 @@ DefineIndex(RangeVar *heapRelation,
CommitTransactionCommand(); CommitTransactionCommand();
StartTransactionCommand(); StartTransactionCommand();
/* Establish transaction snapshot ... else GetLatestSnapshot complains */
(void) GetTransactionSnapshot();
/* /*
* Now we must wait until no running transaction could have the table open * Now we must wait until no running transaction could have the table open
* with the old list of indexes. If we can take an exclusive lock then * with the old list of indexes. To do this, inquire which xacts currently
* there are none now and anybody who opens it later will get the new * would conflict with ShareLock on the table -- ie, which ones have
* index in their relcache entry. Alternatively, if our Xmin reaches our * a lock that permits writing the table. Then wait for each of these
* own (new) transaction then we know no transactions that started before * xacts to commit or abort. Note we do not need to worry about xacts
* the index was visible are left anyway. * that open the table for writing after this point; they will see the
* new index when they open it.
*
* Note: GetLockConflicts() never reports our own xid,
* hence we need not check for that.
*/ */
for (;;) SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
{ old_xact_list = GetLockConflicts(&heaplocktag, ShareLock);
CHECK_FOR_INTERRUPTS();
if (ConditionalLockRelationOid(relationId, ExclusiveLock)) foreach(lc, old_xact_list)
{ {
/* Release the lock right away to avoid blocking anyone */ TransactionId xid = lfirst_xid(lc);
UnlockRelationOid(relationId, ExclusiveLock);
break;
}
if (TransactionIdEquals(GetLatestSnapshot()->xmin,
GetTopTransactionId()))
break;
pg_usleep(1000000L); /* 1 sec */ XactLockTableWait(xid);
} }
/* /*
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.171 2006/08/19 01:36:28 tgl Exp $ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.172 2006/08/27 19:14:34 tgl Exp $
* *
* NOTES * NOTES
* A lock table is a shared memory hash table. When * A lock table is a shared memory hash table. When
...@@ -1685,6 +1685,104 @@ LockReassignCurrentOwner(void) ...@@ -1685,6 +1685,104 @@ LockReassignCurrentOwner(void)
} }
/*
* GetLockConflicts
* Get a list of TransactionIds of xacts currently holding locks
* that would conflict with the specified lock/lockmode.
* xacts merely awaiting such a lock are NOT reported.
*
* Of course, the result could be out of date by the time it's returned,
* so use of this function has to be thought about carefully.
*
* Only top-level XIDs are reported. Note we never include the current xact
* in the result list, since an xact never blocks itself.
*/
List *
GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode)
{
List *result = NIL;
LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
LockMethod lockMethodTable;
LOCK *lock;
LOCKMASK conflictMask;
SHM_QUEUE *procLocks;
PROCLOCK *proclock;
uint32 hashcode;
LWLockId partitionLock;
if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
lockMethodTable = LockMethods[lockmethodid];
if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
elog(ERROR, "unrecognized lock mode: %d", lockmode);
/*
* Look up the lock object matching the tag.
*/
hashcode = LockTagHashCode(locktag);
partitionLock = LockHashPartitionLock(hashcode);
LWLockAcquire(partitionLock, LW_SHARED);
lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
(void *) locktag,
hashcode,
HASH_FIND,
NULL);
if (!lock)
{
/*
* If the lock object doesn't exist, there is nothing holding a
* lock on this lockable object.
*/
LWLockRelease(partitionLock);
return NIL;
}
/*
* Examine each existing holder (or awaiter) of the lock.
*/
conflictMask = lockMethodTable->conflictTab[lockmode];
procLocks = &(lock->procLocks);
proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
offsetof(PROCLOCK, lockLink));
while (proclock)
{
if (conflictMask & proclock->holdMask)
{
PGPROC *proc = proclock->tag.myProc;
/* A backend never blocks itself */
if (proc != MyProc)
{
/* Fetch xid just once - see GetNewTransactionId */
TransactionId xid = proc->xid;
/*
* Race condition: during xact commit/abort we zero out
* PGPROC's xid before we mark its locks released. If we see
* zero in the xid field, assume the xact is in process of
* shutting down and act as though the lock is already
* released.
*/
if (TransactionIdIsValid(xid))
result = lappend_xid(result, xid);
}
}
proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
offsetof(PROCLOCK, lockLink));
}
LWLockRelease(partitionLock);
return result;
}
/* /*
* AtPrepare_Locks * AtPrepare_Locks
* Do the preparatory work for a PREPARE: make 2PC state file records * Do the preparatory work for a PREPARE: make 2PC state file records
......
...@@ -7,13 +7,14 @@ ...@@ -7,13 +7,14 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.97 2006/07/31 20:09:05 tgl Exp $ * $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.98 2006/08/27 19:14:34 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#ifndef LOCK_H_ #ifndef LOCK_H_
#define LOCK_H_ #define LOCK_H_
#include "nodes/pg_list.h"
#include "storage/itemptr.h" #include "storage/itemptr.h"
#include "storage/lwlock.h" #include "storage/lwlock.h"
#include "storage/shmem.h" #include "storage/shmem.h"
...@@ -412,6 +413,7 @@ extern bool LockRelease(const LOCKTAG *locktag, ...@@ -412,6 +413,7 @@ extern bool LockRelease(const LOCKTAG *locktag,
extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks); extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
extern void LockReleaseCurrentOwner(void); extern void LockReleaseCurrentOwner(void);
extern void LockReassignCurrentOwner(void); extern void LockReassignCurrentOwner(void);
extern List *GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode);
extern void AtPrepare_Locks(void); extern void AtPrepare_Locks(void);
extern void PostPrepare_Locks(TransactionId xid); extern void PostPrepare_Locks(TransactionId xid);
extern int LockCheckConflicts(LockMethod lockMethodTable, extern int LockCheckConflicts(LockMethod lockMethodTable,
...@@ -421,13 +423,6 @@ extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode); ...@@ -421,13 +423,6 @@ extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode);
extern void GrantAwaitedLock(void); extern void GrantAwaitedLock(void);
extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode); extern void RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode);
extern Size LockShmemSize(void); extern Size LockShmemSize(void);
extern bool DeadLockCheck(PGPROC *proc);
extern void DeadLockReport(void);
extern void RememberSimpleDeadLock(PGPROC *proc1,
LOCKMODE lockmode,
LOCK *lock,
PGPROC *proc2);
extern void InitDeadLockChecking(void);
extern LockData *GetLockStatusData(void); extern LockData *GetLockStatusData(void);
extern const char *GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode); extern const char *GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode);
...@@ -438,6 +433,14 @@ extern void lock_twophase_postcommit(TransactionId xid, uint16 info, ...@@ -438,6 +433,14 @@ extern void lock_twophase_postcommit(TransactionId xid, uint16 info,
extern void lock_twophase_postabort(TransactionId xid, uint16 info, extern void lock_twophase_postabort(TransactionId xid, uint16 info,
void *recdata, uint32 len); void *recdata, uint32 len);
extern bool DeadLockCheck(PGPROC *proc);
extern void DeadLockReport(void);
extern void RememberSimpleDeadLock(PGPROC *proc1,
LOCKMODE lockmode,
LOCK *lock,
PGPROC *proc2);
extern void InitDeadLockChecking(void);
#ifdef LOCK_DEBUG #ifdef LOCK_DEBUG
extern void DumpLocks(PGPROC *proc); extern void DumpLocks(PGPROC *proc);
extern void DumpAllLocks(void); extern void DumpAllLocks(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment