Commit 140b078d authored by Tom Lane's avatar Tom Lane

Improve LockAcquire API per my recent proposal. All error conditions

are now reported via elog, eliminating the need to test the result code
at most call sites.  Make it possible for the caller to distinguish a
freshly acquired lock from one already held in the current transaction.
Use that capability to avoid redundant AcceptInvalidationMessages() calls
in LockRelation().
parent 299c4420
...@@ -33,8 +33,8 @@ user_lock(uint32 id1, uint32 id2, LOCKMODE lockmode) ...@@ -33,8 +33,8 @@ user_lock(uint32 id1, uint32 id2, LOCKMODE lockmode)
SET_LOCKTAG_USERLOCK(tag, id1, id2); SET_LOCKTAG_USERLOCK(tag, id1, id2);
return LockAcquire(USER_LOCKMETHOD, &tag, InvalidTransactionId, return (LockAcquire(USER_LOCKMETHOD, &tag, InvalidTransactionId,
lockmode, true); lockmode, true) != LOCKACQUIRE_NOT_AVAIL);
} }
int int
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.74 2005/05/19 21:35:46 tgl Exp $ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.75 2005/05/29 22:45:02 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -136,24 +136,28 @@ void ...@@ -136,24 +136,28 @@ void
LockRelation(Relation relation, LOCKMODE lockmode) LockRelation(Relation relation, LOCKMODE lockmode)
{ {
LOCKTAG tag; LOCKTAG tag;
LockAcquireResult res;
SET_LOCKTAG_RELATION(tag, SET_LOCKTAG_RELATION(tag,
relation->rd_lockInfo.lockRelId.dbId, relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId); relation->rd_lockInfo.lockRelId.relId);
if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), res = LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false)) lockmode, false);
elog(ERROR, "LockAcquire failed");
/* /*
* Check to see if the relcache entry has been invalidated while we * Check to see if the relcache entry has been invalidated while we
* were waiting to lock it. If so, rebuild it, or ereport() trying. * were waiting to lock it. If so, rebuild it, or ereport() trying.
* Increment the refcount to ensure that RelationFlushRelation will * Increment the refcount to ensure that RelationFlushRelation will
* rebuild it and not just delete it. * rebuild it and not just delete it. We can skip this if the lock
* was already held, however.
*/ */
RelationIncrementReferenceCount(relation); if (res != LOCKACQUIRE_ALREADY_HELD)
AcceptInvalidationMessages(); {
RelationDecrementReferenceCount(relation); RelationIncrementReferenceCount(relation);
AcceptInvalidationMessages();
RelationDecrementReferenceCount(relation);
}
} }
/* /*
...@@ -169,24 +173,31 @@ bool ...@@ -169,24 +173,31 @@ bool
ConditionalLockRelation(Relation relation, LOCKMODE lockmode) ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
{ {
LOCKTAG tag; LOCKTAG tag;
LockAcquireResult res;
SET_LOCKTAG_RELATION(tag, SET_LOCKTAG_RELATION(tag,
relation->rd_lockInfo.lockRelId.dbId, relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId); relation->rd_lockInfo.lockRelId.relId);
if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), res = LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, true)) lockmode, true);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false; return false;
/* /*
* Check to see if the relcache entry has been invalidated while we * Check to see if the relcache entry has been invalidated while we
* were waiting to lock it. If so, rebuild it, or ereport() trying. * were waiting to lock it. If so, rebuild it, or ereport() trying.
* Increment the refcount to ensure that RelationFlushRelation will * Increment the refcount to ensure that RelationFlushRelation will
* rebuild it and not just delete it. * rebuild it and not just delete it. We can skip this if the lock
* was already held, however.
*/ */
RelationIncrementReferenceCount(relation); if (res != LOCKACQUIRE_ALREADY_HELD)
AcceptInvalidationMessages(); {
RelationDecrementReferenceCount(relation); RelationIncrementReferenceCount(relation);
AcceptInvalidationMessages();
RelationDecrementReferenceCount(relation);
}
return true; return true;
} }
...@@ -225,9 +236,8 @@ LockRelationForSession(LockRelId *relid, LOCKMODE lockmode) ...@@ -225,9 +236,8 @@ LockRelationForSession(LockRelId *relid, LOCKMODE lockmode)
SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId); SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
if (!LockAcquire(LockTableId, &tag, InvalidTransactionId, (void) LockAcquire(LockTableId, &tag, InvalidTransactionId,
lockmode, false)) lockmode, false);
elog(ERROR, "LockAcquire failed");
} }
/* /*
...@@ -262,9 +272,8 @@ LockRelationForExtension(Relation relation, LOCKMODE lockmode) ...@@ -262,9 +272,8 @@ LockRelationForExtension(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId, relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId); relation->rd_lockInfo.lockRelId.relId);
if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false)) lockmode, false);
elog(ERROR, "LockAcquire failed");
} }
/* /*
...@@ -298,9 +307,8 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode) ...@@ -298,9 +307,8 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.relId, relation->rd_lockInfo.lockRelId.relId,
blkno); blkno);
if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false)) lockmode, false);
elog(ERROR, "LockAcquire failed");
} }
/* /*
...@@ -319,8 +327,8 @@ ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode) ...@@ -319,8 +327,8 @@ ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.relId, relation->rd_lockInfo.lockRelId.relId,
blkno); blkno);
return LockAcquire(LockTableId, &tag, GetTopTransactionId(), return (LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, true); lockmode, true) != LOCKACQUIRE_NOT_AVAIL);
} }
/* /*
...@@ -357,9 +365,8 @@ LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode) ...@@ -357,9 +365,8 @@ LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
ItemPointerGetBlockNumber(tid), ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid)); ItemPointerGetOffsetNumber(tid));
if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false)) lockmode, false);
elog(ERROR, "LockAcquire failed");
} }
/* /*
...@@ -393,9 +400,8 @@ XactLockTableInsert(TransactionId xid) ...@@ -393,9 +400,8 @@ XactLockTableInsert(TransactionId xid)
SET_LOCKTAG_TRANSACTION(tag, xid); SET_LOCKTAG_TRANSACTION(tag, xid);
if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
ExclusiveLock, false)) ExclusiveLock, false);
elog(ERROR, "LockAcquire failed");
} }
/* /*
...@@ -441,8 +447,9 @@ XactLockTableWait(TransactionId xid) ...@@ -441,8 +447,9 @@ XactLockTableWait(TransactionId xid)
SET_LOCKTAG_TRANSACTION(tag, xid); SET_LOCKTAG_TRANSACTION(tag, xid);
if (!LockAcquire(LockTableId, &tag, myxid, ShareLock, false)) (void) LockAcquire(LockTableId, &tag, myxid,
elog(ERROR, "LockAcquire failed"); ShareLock, false);
LockRelease(LockTableId, &tag, myxid, ShareLock); LockRelease(LockTableId, &tag, myxid, ShareLock);
if (!TransactionIdIsInProgress(xid)) if (!TransactionIdIsInProgress(xid))
...@@ -479,9 +486,8 @@ LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, ...@@ -479,9 +486,8 @@ LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
objid, objid,
objsubid); objsubid);
if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false)) lockmode, false);
elog(ERROR, "LockAcquire failed");
} }
/* /*
...@@ -519,9 +525,8 @@ LockSharedObject(Oid classid, Oid objid, uint16 objsubid, ...@@ -519,9 +525,8 @@ LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
objid, objid,
objsubid); objsubid);
if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false)) lockmode, false);
elog(ERROR, "LockAcquire failed");
} }
/* /*
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.153 2005/05/29 04:23:04 tgl Exp $ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.154 2005/05/29 22:45:02 tgl Exp $
* *
* NOTES * NOTES
* Outside modules can create a lock table and acquire/release * Outside modules can create a lock table and acquire/release
...@@ -160,8 +160,8 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP) ...@@ -160,8 +160,8 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
static void RemoveLocalLock(LOCALLOCK *locallock); static void RemoveLocalLock(LOCALLOCK *locallock);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock, static void WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
ResourceOwner owner); ResourceOwner owner);
static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
int *myHolding); int *myHolding);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
...@@ -377,11 +377,14 @@ LockMethodTableRename(LOCKMETHODID lockmethodid) ...@@ -377,11 +377,14 @@ LockMethodTableRename(LOCKMETHODID lockmethodid)
* LockAcquire -- Check for lock conflicts, sleep if conflict found, * LockAcquire -- Check for lock conflicts, sleep if conflict found,
* set lock if/when no conflicts. * set lock if/when no conflicts.
* *
* Returns: TRUE if lock was acquired, FALSE otherwise. Note that * Returns one of:
* a FALSE return is to be expected if dontWait is TRUE; * LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
* but if dontWait is FALSE, only a parameter error can cause * LOCKACQUIRE_OK lock successfully acquired
* a FALSE return. (XXX probably we should just ereport on parameter * LOCKACQUIRE_ALREADY_HELD incremented count for lock already held
* errors, instead of conflating this with failure to acquire lock?) *
* In the normal case where dontWait=false and the caller doesn't need to
* distinguish a freshly acquired lock from one already taken earlier in
* this same transaction, there is no need to examine the return value.
* *
* Side Effects: The lock is acquired and recorded in lock tables. * Side Effects: The lock is acquired and recorded in lock tables.
* *
...@@ -416,8 +419,7 @@ LockMethodTableRename(LOCKMETHODID lockmethodid) ...@@ -416,8 +419,7 @@ LockMethodTableRename(LOCKMETHODID lockmethodid)
* *
* DZ - 22 Nov 1997 * DZ - 22 Nov 1997
*/ */
LockAcquireResult
bool
LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode, bool dontWait) TransactionId xid, LOCKMODE lockmode, bool dontWait)
{ {
...@@ -447,10 +449,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, ...@@ -447,10 +449,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
Assert(lockmethodid < NumLockMethods); Assert(lockmethodid < NumLockMethods);
lockMethodTable = LockMethods[lockmethodid]; lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable) if (!lockMethodTable)
{ elog(ERROR, "unrecognized lock method: %d", lockmethodid);
elog(WARNING, "bad lock table id: %d", lockmethodid);
return FALSE;
}
/* Session locks and user locks are not transactional */ /* Session locks and user locks are not transactional */
if (xid != InvalidTransactionId && if (xid != InvalidTransactionId &&
...@@ -507,7 +506,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, ...@@ -507,7 +506,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
if (locallock->nLocks > 0) if (locallock->nLocks > 0)
{ {
GrantLockLocal(locallock, owner); GrantLockLocal(locallock, owner);
return TRUE; return LOCKACQUIRE_ALREADY_HELD;
} }
/* /*
...@@ -669,7 +668,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, ...@@ -669,7 +668,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
GrantLockLocal(locallock, owner); GrantLockLocal(locallock, owner);
PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock); PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
LWLockRelease(masterLock); LWLockRelease(masterLock);
return TRUE; return LOCKACQUIRE_ALREADY_HELD;
} }
/* /*
...@@ -696,8 +695,8 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, ...@@ -696,8 +695,8 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
/* /*
* We can't acquire the lock immediately. If caller specified no * We can't acquire the lock immediately. If caller specified no
* blocking, remove useless table entries and return FALSE without * blocking, remove useless table entries and return NOT_AVAIL
* waiting. * without waiting.
*/ */
if (dontWait) if (dontWait)
{ {
...@@ -720,7 +719,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, ...@@ -720,7 +719,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
LWLockRelease(masterLock); LWLockRelease(masterLock);
if (locallock->nLocks == 0) if (locallock->nLocks == 0)
RemoveLocalLock(locallock); RemoveLocalLock(locallock);
return FALSE; return LOCKACQUIRE_NOT_AVAIL;
} }
/* /*
...@@ -740,7 +739,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, ...@@ -740,7 +739,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
/* /*
* Sleep till someone wakes me up. * Sleep till someone wakes me up.
*/ */
status = WaitOnLock(lockmethodid, locallock, owner); WaitOnLock(lockmethodid, locallock, owner);
/* /*
* NOTE: do not do any material change of state between here and * NOTE: do not do any material change of state between here and
...@@ -759,7 +758,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, ...@@ -759,7 +758,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */ /* Should we retry ? */
LWLockRelease(masterLock); LWLockRelease(masterLock);
return FALSE; elog(ERROR, "LockAcquire failed");
} }
PROCLOCK_PRINT("LockAcquire: granted", proclock); PROCLOCK_PRINT("LockAcquire: granted", proclock);
LOCK_PRINT("LockAcquire: granted", lock, lockmode); LOCK_PRINT("LockAcquire: granted", lock, lockmode);
...@@ -767,7 +766,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, ...@@ -767,7 +766,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
LWLockRelease(masterLock); LWLockRelease(masterLock);
return status == STATUS_OK; return LOCKACQUIRE_OK;
} }
/* /*
...@@ -1091,7 +1090,7 @@ GrantAwaitedLock(void) ...@@ -1091,7 +1090,7 @@ GrantAwaitedLock(void)
* *
* The locktable's masterLock must be held at entry. * The locktable's masterLock must be held at entry.
*/ */
static int static void
WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock, WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
ResourceOwner owner) ResourceOwner owner)
{ {
...@@ -1159,7 +1158,6 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock, ...@@ -1159,7 +1158,6 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
LOCK_PRINT("WaitOnLock: wakeup on lock", LOCK_PRINT("WaitOnLock: wakeup on lock",
locallock->lock, locallock->tag.mode); locallock->lock, locallock->tag.mode);
return STATUS_OK;
} }
/* /*
...@@ -1247,7 +1245,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, ...@@ -1247,7 +1245,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
Assert(lockmethodid < NumLockMethods); Assert(lockmethodid < NumLockMethods);
lockMethodTable = LockMethods[lockmethodid]; lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable) if (!lockMethodTable)
elog(ERROR, "bad lock method: %d", lockmethodid); elog(ERROR, "unrecognized lock method: %d", lockmethodid);
/* /*
* Find the LOCALLOCK entry for this lock and lockmode * Find the LOCALLOCK entry for this lock and lockmode
...@@ -1397,7 +1395,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids) ...@@ -1397,7 +1395,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
Assert(lockmethodid < NumLockMethods); Assert(lockmethodid < NumLockMethods);
lockMethodTable = LockMethods[lockmethodid]; lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable) if (!lockMethodTable)
elog(ERROR, "bad lock method: %d", lockmethodid); elog(ERROR, "unrecognized lock method: %d", lockmethodid);
numLockModes = lockMethodTable->numLockModes; numLockModes = lockMethodTable->numLockModes;
masterLock = lockMethodTable->masterLock; masterLock = lockMethodTable->masterLock;
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.86 2005/05/19 23:30:18 tgl Exp $ * $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.87 2005/05/29 22:45:02 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -348,6 +348,15 @@ typedef struct ...@@ -348,6 +348,15 @@ typedef struct
} LockData; } LockData;
/* Result codes for LockAcquire() */
typedef enum
{
LOCKACQUIRE_NOT_AVAIL, /* lock not available, and dontWait=true */
LOCKACQUIRE_OK, /* lock successfully acquired */
LOCKACQUIRE_ALREADY_HELD /* incremented count for lock already held */
} LockAcquireResult;
/* /*
* function prototypes * function prototypes
*/ */
...@@ -357,7 +366,7 @@ extern LOCKMETHODID LockMethodTableInit(const char *tabName, ...@@ -357,7 +366,7 @@ extern LOCKMETHODID LockMethodTableInit(const char *tabName,
const LOCKMASK *conflictsP, const LOCKMASK *conflictsP,
int numModes, int maxBackends); int numModes, int maxBackends);
extern LOCKMETHODID LockMethodTableRename(LOCKMETHODID lockmethodid); extern LOCKMETHODID LockMethodTableRename(LOCKMETHODID lockmethodid);
extern bool LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, extern LockAcquireResult LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode, bool dontWait); TransactionId xid, LOCKMODE lockmode, bool dontWait);
extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode); TransactionId xid, LOCKMODE lockmode);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment