Commit 8563ccae authored by Tom Lane's avatar Tom Lane

Simplify shared-memory lock data structures as per recent discussion:

it is sufficient to track whether a backend holds a lock or not, and
store information about transaction vs. session locks only in the
inside-the-backend LocalLockTable.  Since there can now be but one
PROCLOCK per lock per backend, LockCountMyLocks() is no longer needed,
thus eliminating some O(N^2) behavior when a backend holds many locks.
Also simplify the LockAcquire/LockRelease API by passing just a
'sessionLock' boolean instead of a transaction ID.  The previous API
was designed with the idea that per-transaction lock holding would be
important for subtransactions, but now that we have subtransactions we
know that this is unwanted.  While at it, add an 'isTempObject' parameter
to LockAcquire to indicate whether the lock is being taken on a temp
table.  This is not used just yet, but will be needed shortly for
two-phase commit.
parent f5835b4b
......@@ -33,8 +33,8 @@ user_lock(uint32 id1, uint32 id2, LOCKMODE lockmode)
SET_LOCKTAG_USERLOCK(tag, id1, id2);
return (LockAcquire(USER_LOCKMETHOD, &tag, InvalidTransactionId,
lockmode, true) != LOCKACQUIRE_NOT_AVAIL);
return (LockAcquire(USER_LOCKMETHOD, &tag, false,
lockmode, true, true) != LOCKACQUIRE_NOT_AVAIL);
}
int
......@@ -44,7 +44,7 @@ user_unlock(uint32 id1, uint32 id2, LOCKMODE lockmode)
SET_LOCKTAG_USERLOCK(tag, id1, id2);
return LockRelease(USER_LOCKMETHOD, &tag, InvalidTransactionId, lockmode);
return LockRelease(USER_LOCKMETHOD, &tag, lockmode, true);
}
int
......
......@@ -13,7 +13,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.309 2005/05/19 21:35:45 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.310 2005/06/14 22:15:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -1009,7 +1009,7 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
* same process.
*/
onerelid = onerel->rd_lockInfo.lockRelId;
LockRelationForSession(&onerelid, lmode);
LockRelationForSession(&onerelid, onerel->rd_istemp, lmode);
/*
* Remember the relation's TOAST relation for later
......
$PostgreSQL: pgsql/src/backend/storage/lmgr/README,v 1.16 2005/04/29 22:28:24 tgl Exp $
$PostgreSQL: pgsql/src/backend/storage/lmgr/README,v 1.17 2005/06/14 22:15:32 tgl Exp $
LOCKING OVERVIEW
......@@ -57,16 +57,17 @@ details.
There are two fundamental lock structures in shared memory: the
per-lockable-object LOCK struct, and the per-lock-and-requestor PROCLOCK
struct. A LOCK object exists for each lockable object that currently has
locks held or requested on it. A PROCLOCK struct exists for each transaction
locks held or requested on it. A PROCLOCK struct exists for each backend
that is holding or requesting lock(s) on each LOCK object.
In addition to these, each backend maintains an unshared LOCALLOCK structure
for each lockable object and lock mode that it is currently holding or
requesting. The shared lock structures only allow a single lock grant to
be made per lockable object/lock mode/transaction. Internally to a backend,
be made per lockable object/lock mode/backend. Internally to a backend,
however, the same lock may be requested and perhaps released multiple times
in a transaction. The internal request counts are held in LOCALLOCK so that
the shared LockMgrLock need not be obtained to alter them.
in a transaction, and it can also be held both transactionally and session-
wide. The internal request counts are held in LOCALLOCK so that the shared
LockMgrLock need not be obtained to alter them.
---------------------------------------------------------------------------
......@@ -112,9 +113,8 @@ nRequested -
acquired. The count includes attempts by processes which were put
to sleep due to conflicts. It also counts the same backend twice
if, for example, a backend process first acquires a read and then
acquires a write, or acquires the lock under two different transaction
IDs. (But multiple acquisitions of the same lock/lock mode under the
same transaction ID are not multiply counted here; they are recorded
acquires a write. (But multiple acquisitions of the same lock/lock mode
within a backend are not multiply counted here; they are recorded
only in the backend's LOCALLOCK structure.)
requested -
......@@ -153,23 +153,17 @@ tag -
tag.proc
SHMEM offset of PROC of backend process that owns this PROCLOCK.
tag.xid
XID of transaction this PROCLOCK is for, or InvalidTransactionId
if the PROCLOCK is for session-level locking.
Note that this structure will support multiple transactions running
concurrently in one backend. Currently we do not use it for that
purpose: subtransactions acquire locks in the name of their top parent
transaction, to simplify reassigning lock ownership at subtransaction end.
So the XID field is really only needed to distinguish per-transaction
locks from session locks. User locks are always session locks, and we
also use session locks for multi-transaction operations like VACUUM.
holdMask -
A bitmask for the lock types successfully acquired by this PROCLOCK.
This should be a subset of the LOCK object's grantMask, and also a
subset of the PGPROC object's heldLocks mask.
releaseMask -
A bitmask for the lock types due to be released during LockReleaseAll.
This must be a subset of the holdMask. Note that it is modified without
taking the LockMgrLock, and therefore it is unsafe for any backend except
the one owning the PROCLOCK to examine/change it.
lockLink -
List link for shared memory queue of all the PROCLOCK objects for the
same LOCK.
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.75 2005/05/29 22:45:02 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.76 2005/06/14 22:15:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -142,8 +142,8 @@ LockRelation(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
res = LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false);
res = LockAcquire(LockTableId, &tag, relation->rd_istemp,
lockmode, false, false);
/*
* Check to see if the relcache entry has been invalidated while we
......@@ -179,8 +179,8 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
res = LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, true);
res = LockAcquire(LockTableId, &tag, relation->rd_istemp,
lockmode, false, true);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
......@@ -214,7 +214,7 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
LockRelease(LockTableId, &tag, lockmode, false);
}
/*
......@@ -230,14 +230,14 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
* relcache entry is up to date.
*/
void
LockRelationForSession(LockRelId *relid, LOCKMODE lockmode)
LockRelationForSession(LockRelId *relid, bool istemprel, LOCKMODE lockmode)
{
LOCKTAG tag;
SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
(void) LockAcquire(LockTableId, &tag, InvalidTransactionId,
lockmode, false);
(void) LockAcquire(LockTableId, &tag, istemprel,
lockmode, true, false);
}
/*
......@@ -250,7 +250,7 @@ UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode)
SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
LockRelease(LockTableId, &tag, InvalidTransactionId, lockmode);
LockRelease(LockTableId, &tag, lockmode, true);
}
/*
......@@ -272,8 +272,8 @@ LockRelationForExtension(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
(void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false);
(void) LockAcquire(LockTableId, &tag, relation->rd_istemp,
lockmode, false, false);
}
/*
......@@ -288,7 +288,7 @@ UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
LockRelease(LockTableId, &tag, lockmode, false);
}
/*
......@@ -307,8 +307,8 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.relId,
blkno);
(void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false);
(void) LockAcquire(LockTableId, &tag, relation->rd_istemp,
lockmode, false, false);
}
/*
......@@ -327,8 +327,8 @@ ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.relId,
blkno);
return (LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, true) != LOCKACQUIRE_NOT_AVAIL);
return (LockAcquire(LockTableId, &tag, relation->rd_istemp,
lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
}
/*
......@@ -344,7 +344,7 @@ UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.relId,
blkno);
LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
LockRelease(LockTableId, &tag, lockmode, false);
}
/*
......@@ -365,8 +365,8 @@ LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid));
(void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false);
(void) LockAcquire(LockTableId, &tag, relation->rd_istemp,
lockmode, false, false);
}
/*
......@@ -383,7 +383,7 @@ UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid));
LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
LockRelease(LockTableId, &tag, lockmode, false);
}
/*
......@@ -400,8 +400,8 @@ XactLockTableInsert(TransactionId xid)
SET_LOCKTAG_TRANSACTION(tag, xid);
(void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
ExclusiveLock, false);
(void) LockAcquire(LockTableId, &tag, false,
ExclusiveLock, false, false);
}
/*
......@@ -419,7 +419,7 @@ XactLockTableDelete(TransactionId xid)
SET_LOCKTAG_TRANSACTION(tag, xid);
LockRelease(LockTableId, &tag, GetTopTransactionId(), ExclusiveLock);
LockRelease(LockTableId, &tag, ExclusiveLock, false);
}
/*
......@@ -438,19 +438,18 @@ void
XactLockTableWait(TransactionId xid)
{
LOCKTAG tag;
TransactionId myxid = GetTopTransactionId();
for (;;)
{
Assert(TransactionIdIsValid(xid));
Assert(!TransactionIdEquals(xid, myxid));
Assert(!TransactionIdEquals(xid, GetTopTransactionId()));
SET_LOCKTAG_TRANSACTION(tag, xid);
(void) LockAcquire(LockTableId, &tag, myxid,
ShareLock, false);
(void) LockAcquire(LockTableId, &tag, false,
ShareLock, false, false);
LockRelease(LockTableId, &tag, myxid, ShareLock);
LockRelease(LockTableId, &tag, ShareLock, false);
if (!TransactionIdIsInProgress(xid))
break;
......@@ -470,9 +469,11 @@ XactLockTableWait(TransactionId xid)
* LockDatabaseObject
*
* Obtain a lock on a general object of the current database. Don't use
* this for shared objects (such as tablespaces). It's usually unwise to
* apply it to entire relations, also, since a lock taken this way will
* NOT conflict with LockRelation.
* this for shared objects (such as tablespaces). It's unwise to apply it
* to relations, also, since a lock taken this way will NOT conflict with
* LockRelation, and also may be wrongly marked if the relation is temp.
* (If we ever invent temp objects that aren't tables, we'll want to extend
* the API of this routine to include an isTempObject flag.)
*/
void
LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
......@@ -486,8 +487,8 @@ LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
objid,
objsubid);
(void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false);
(void) LockAcquire(LockTableId, &tag, false,
lockmode, false, false);
}
/*
......@@ -505,7 +506,7 @@ UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
objid,
objsubid);
LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
LockRelease(LockTableId, &tag, lockmode, false);
}
/*
......@@ -525,8 +526,8 @@ LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
objid,
objsubid);
(void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
lockmode, false);
(void) LockAcquire(LockTableId, &tag, false,
lockmode, false, false);
}
/*
......@@ -544,5 +545,5 @@ UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
objid,
objsubid);
LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
LockRelease(LockTableId, &tag, lockmode, false);
}
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.154 2005/05/29 22:45:02 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.155 2005/06/14 22:15:32 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
......@@ -144,11 +144,10 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
{
if (LOCK_DEBUG_ENABLED((LOCK *) MAKE_PTR(proclockP->tag.lock)))
elog(LOG,
"%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) xid(%u) hold(%x)",
"%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) hold(%x)",
where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
PROCLOCK_LOCKMETHOD(*(proclockP)),
proclockP->tag.proc, proclockP->tag.xid,
(int) proclockP->holdMask);
proclockP->tag.proc, (int) proclockP->holdMask);
}
#else /* not LOCK_DEBUG */
......@@ -162,8 +161,6 @@ static void RemoveLocalLock(LOCALLOCK *locallock);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
static void WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
ResourceOwner owner);
static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
int *myHolding);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
PROCLOCK *proclock, LockMethod lockMethodTable);
static void CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock,
......@@ -377,6 +374,15 @@ LockMethodTableRename(LOCKMETHODID lockmethodid)
* LockAcquire -- Check for lock conflicts, sleep if conflict found,
* set lock if/when no conflicts.
*
* Inputs:
* lockmethodid: identifies which lock table to use
* locktag: unique identifier for the lockable object
* isTempObject: is the lockable object a temporary object? (Under 2PC,
* such locks cannot be persisted)
* lockmode: lock mode to acquire
* sessionLock: if true, acquire lock for session not current transaction
* dontWait: if true, don't wait to acquire lock
*
* Returns one of:
* LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
* LOCKACQUIRE_OK lock successfully acquired
......@@ -420,8 +426,12 @@ LockMethodTableRename(LOCKMETHODID lockmethodid)
* DZ - 22 Nov 1997
*/
LockAcquireResult
LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode, bool dontWait)
LockAcquire(LOCKMETHODID lockmethodid,
LOCKTAG *locktag,
bool isTempObject,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait)
{
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
......@@ -433,8 +443,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
LWLockId masterLock;
LockMethod lockMethodTable;
int status;
int myHolding[MAX_LOCKMODES];
int i;
#ifdef LOCK_DEBUG
if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
......@@ -452,8 +460,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
/* Session locks and user locks are not transactional */
if (xid != InvalidTransactionId &&
lockmethodid == DEFAULT_LOCKMETHOD)
if (!sessionLock && lockmethodid == DEFAULT_LOCKMETHOD)
owner = CurrentResourceOwner;
else
owner = NULL;
......@@ -463,7 +470,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
*/
MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
localtag.lock = *locktag;
localtag.xid = xid;
localtag.mode = lockmode;
locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
......@@ -477,6 +483,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
{
locallock->lock = NULL;
locallock->proclock = NULL;
locallock->isTempObject = isTempObject;
locallock->nLocks = 0;
locallock->numLockOwners = 0;
locallock->maxLockOwners = 8;
......@@ -487,6 +494,8 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
}
else
{
Assert(locallock->isTempObject == isTempObject);
/* Make sure there will be room to remember the lock */
if (locallock->numLockOwners >= locallock->maxLockOwners)
{
......@@ -566,7 +575,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
proclocktag.lock = MAKE_OFFSET(lock);
proclocktag.proc = MAKE_OFFSET(MyProc);
TransactionIdStore(xid, &proclocktag.xid);
/*
* Find or create a proclock entry with this tag
......@@ -605,6 +613,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
if (!found)
{
proclock->holdMask = 0;
proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
......@@ -632,6 +641,9 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
* XXX Doing numeric comparison on the lockmodes is a hack; it'd be
* better to use a table. For now, though, this works.
*/
{
int i;
for (i = lockMethodTable->numLockModes; i > 0; i--)
{
if (proclock->holdMask & LOCKBIT_ON(i))
......@@ -641,10 +653,12 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
elog(LOG, "deadlock risk: raising lock level"
" from %s to %s on object %u/%u/%u",
lock_mode_names[i], lock_mode_names[lockmode],
lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
lock->tag.locktag_field1, lock->tag.locktag_field2,
lock->tag.locktag_field3);
break;
}
}
}
#endif /* CHECK_DEADLOCK_RISK */
}
......@@ -658,18 +672,14 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
/*
* If this process (under any XID) is a holder of the lock, just grant
* myself another one without blocking.
* We shouldn't already hold the desired lock; else locallock table
* is broken.
*/
LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
if (myHolding[lockmode] > 0)
{
GrantLock(lock, proclock, lockmode);
GrantLockLocal(locallock, owner);
PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
LWLockRelease(masterLock);
return LOCKACQUIRE_ALREADY_HELD;
}
if (proclock->holdMask & LOCKBIT_ON(lockmode))
elog(ERROR, "lock %s on object %u/%u/%u is already held",
lock_mode_names[lockmode],
lock->tag.locktag_field1, lock->tag.locktag_field2,
lock->tag.locktag_field3);
/*
* If lock requested conflicts with locks requested by waiters, must
......@@ -680,8 +690,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
status = STATUS_FOUND;
else
status = LockCheckConflicts(lockMethodTable, lockmode,
lock, proclock,
MyProc, myHolding);
lock, proclock, MyProc);
if (status == STATUS_OK)
{
......@@ -723,18 +732,9 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
}
/*
* Construct bitmask of locks this process holds on this object.
* Set bitmask of locks this process already holds on this object.
*/
{
LOCKMASK heldLocks = 0;
for (i = 1; i <= lockMethodTable->numLockModes; i++)
{
if (myHolding[i] > 0)
heldLocks |= LOCKBIT_ON(i);
}
MyProc->heldLocks = heldLocks;
}
MyProc->heldLocks = proclock->holdMask;
/*
* Sleep till someone wakes me up.
......@@ -793,26 +793,22 @@ RemoveLocalLock(LOCALLOCK *locallock)
*
* NOTES:
* Here's what makes this complicated: one process's locks don't
* conflict with one another, even if they are held under different
* transaction IDs (eg, session and xact locks do not conflict).
* conflict with one another, no matter what purpose they are held for
* (eg, session and transaction locks do not conflict).
* So, we must subtract off our own locks when determining whether the
* requested new lock conflicts with those already held.
*
* The caller can optionally pass the process's total holding counts, if
* known. If NULL is passed then these values will be computed internally.
*/
int
LockCheckConflicts(LockMethod lockMethodTable,
LOCKMODE lockmode,
LOCK *lock,
PROCLOCK *proclock,
PGPROC *proc,
int *myHolding) /* myHolding[] array or NULL */
PGPROC *proc)
{
int numLockModes = lockMethodTable->numLockModes;
LOCKMASK bitmask;
LOCKMASK myLocks;
LOCKMASK otherLocks;
int i;
int localHolding[MAX_LOCKMODES];
/*
* first check for global conflicts: If no locks conflict with my
......@@ -830,32 +826,26 @@ LockCheckConflicts(LockMethod lockMethodTable,
}
/*
* Rats. Something conflicts. But it could still be my own lock. We
* have to construct a conflict mask that does not reflect our own
* locks. Locks held by the current process under another XID also
* count as "our own locks".
* Rats. Something conflicts. But it could still be my own lock.
* We have to construct a conflict mask that does not reflect our own
* locks, but only lock types held by other processes.
*/
if (myHolding == NULL)
{
/* Caller didn't do calculation of total holding for me */
LockCountMyLocks(proclock->tag.lock, proc, localHolding);
myHolding = localHolding;
}
/* Compute mask of lock types held by other processes */
bitmask = 0;
myLocks = proclock->holdMask;
otherLocks = 0;
for (i = 1; i <= numLockModes; i++)
{
if (lock->granted[i] != myHolding[i])
bitmask |= LOCKBIT_ON(i);
int myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
if (lock->granted[i] > myHolding)
otherLocks |= LOCKBIT_ON(i);
}
/*
* now check again for conflicts. 'bitmask' describes the types of
* now check again for conflicts. 'otherLocks' describes the types of
* locks held by other processes. If one of these conflicts with the
* kind of lock that I want, there is a conflict and I have to sleep.
*/
if (!(lockMethodTable->conflictTab[lockmode] & bitmask))
if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
{
/* no conflict. OK to get the lock */
PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
......@@ -866,46 +856,6 @@ LockCheckConflicts(LockMethod lockMethodTable,
return STATUS_FOUND;
}
/*
* LockCountMyLocks --- Count total number of locks held on a given lockable
* object by a given process (under any transaction ID).
*
* XXX This could be rather slow if the process holds a large number of locks.
* Perhaps it could be sped up if we kept yet a third hashtable of per-
* process lock information. However, for the normal case where a transaction
* doesn't hold a large number of locks, keeping such a table would probably
* be a net slowdown.
*/
static void
LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
{
SHM_QUEUE *procLocks = &(proc->procLocks);
PROCLOCK *proclock;
MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
offsetof(PROCLOCK, procLink));
while (proclock)
{
if (lockOffset == proclock->tag.lock)
{
LOCKMASK holdMask = proclock->holdMask;
int i;
for (i = 1; i < MAX_LOCKMODES; i++)
{
if (holdMask & LOCKBIT_ON(i))
myHolding[i]++;
}
}
proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
}
}
/*
* GrantLock -- update the lock and proclock data structures to show
* the lock request has been granted.
......@@ -1086,7 +1036,7 @@ GrantAwaitedLock(void)
* WaitOnLock -- wait to acquire a lock
*
* Caller must have set MyProc->heldLocks to reflect locks already held
* on the lockable object by this process (under all XIDs).
* on the lockable object by this process.
*
* The locktable's masterLock must be held at entry.
*/
......@@ -1212,7 +1162,8 @@ RemoveFromWaitQueue(PGPROC *proc)
/*
* LockRelease -- look up 'locktag' in lock table 'lockmethodid' and
* release one 'lockmode' lock on it.
* release one 'lockmode' lock on it. Release a session lock if
* 'sessionLock' is true, else release a regular transaction lock.
*
* Side Effects: find any waiting processes that are now wakable,
* grant them their requested locks and awaken them.
......@@ -1222,7 +1173,7 @@ RemoveFromWaitQueue(PGPROC *proc)
*/
bool
LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode)
LOCKMODE lockmode, bool sessionLock)
{
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
......@@ -1252,7 +1203,6 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
*/
MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
localtag.lock = *locktag;
localtag.xid = xid;
localtag.mode = lockmode;
locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
......@@ -1279,8 +1229,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
int i;
/* Session locks and user locks are not transactional */
if (xid != InvalidTransactionId &&
lockmethodid == DEFAULT_LOCKMETHOD)
if (!sessionLock && lockmethodid == DEFAULT_LOCKMETHOD)
owner = CurrentResourceOwner;
else
owner = NULL;
......@@ -1367,15 +1316,11 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
* are held by the current process.
*
* Well, not necessarily *all* locks. The available behaviors are:
*
* allxids == true: release all locks regardless of transaction
* affiliation.
*
* allxids == false: release all locks with Xid != 0
* (zero is the Xid used for "session" locks).
* allLocks == true: release all locks including session locks.
* allLocks == false: release all non-session locks.
*/
void
LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
{
HASH_SEQ_STATUS status;
SHM_QUEUE *procLocks = &(MyProc->procLocks);
......@@ -1427,13 +1372,41 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
continue;
/*
* Ignore locks with Xid=0 unless we are asked to release all
* locks
* If we are asked to release all locks, we can just zap the
* entry. Otherwise, must scan to see if there are session locks.
* We assume there is at most one lockOwners entry for session locks.
*/
if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)
&& !allxids)
if (!allLocks)
{
LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
/* If it's above array position 0, move it down to 0 */
for (i = locallock->numLockOwners - 1; i > 0; i--)
{
if (lockOwners[i].owner == NULL)
{
lockOwners[0] = lockOwners[i];
break;
}
}
if (locallock->numLockOwners > 0 &&
lockOwners[0].owner == NULL &&
lockOwners[0].nLocks > 0)
{
/* Fix the locallock to show just the session locks */
locallock->nLocks = lockOwners[0].nLocks;
locallock->numLockOwners = 1;
/* We aren't deleting this locallock, so done */
continue;
}
}
/* Mark the proclock to show we need to release this lockmode */
if (locallock->nLocks > 0)
locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
/* And remove the locallock hashtable entry */
RemoveLocalLock(locallock);
}
......@@ -1460,11 +1433,19 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
goto next_item;
/*
* Ignore locks with Xid=0 unless we are asked to release all
* locks
* In allLocks mode, force release of all locks even if locallock
* table had problems
*/
if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
&& !allxids)
if (allLocks)
proclock->releaseMask = proclock->holdMask;
else
Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
/*
* Ignore items that have nothing to be released, unless they have
* holdMask == 0 and are therefore recyclable
*/
if (proclock->releaseMask == 0 && proclock->holdMask != 0)
goto next_item;
PROCLOCK_PRINT("LockReleaseAll", proclock);
......@@ -1475,22 +1456,19 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
Assert((proclock->holdMask & ~lock->grantMask) == 0);
/*
* fix the general lock stats
* Release the previously-marked lock modes
*/
if (proclock->holdMask)
{
for (i = 1; i <= numLockModes; i++)
{
if (proclock->holdMask & LOCKBIT_ON(i))
if (proclock->releaseMask & LOCKBIT_ON(i))
wakeupNeeded |= UnGrantLock(lock, i, proclock,
lockMethodTable);
}
}
Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
Assert(lock->nGranted <= lock->nRequested);
LOCK_PRINT("LockReleaseAll: updated", lock, 0);
Assert(proclock->holdMask == 0);
proclock->releaseMask = 0;
/* CleanUpLock will wake up waiters if needed. */
CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
......@@ -1528,8 +1506,6 @@ LockReleaseCurrentOwner(void)
/* Ignore items that must be nontransactional */
if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
continue;
if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
continue;
/* Scan to see if there are any locks belonging to current owner */
lockOwners = locallock->lockOwners;
......@@ -1558,8 +1534,8 @@ LockReleaseCurrentOwner(void)
locallock->nLocks = 1;
if (!LockRelease(DEFAULT_LOCKMETHOD,
&locallock->tag.lock,
locallock->tag.xid,
locallock->tag.mode))
locallock->tag.mode,
false))
elog(WARNING, "LockReleaseCurrentOwner: failed??");
}
break;
......@@ -1594,8 +1570,6 @@ LockReassignCurrentOwner(void)
/* Ignore items that must be nontransactional */
if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
continue;
if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
continue;
/*
* Scan to see if there are any locks belonging to current owner
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.158 2005/05/19 21:35:46 tgl Exp $
* $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.159 2005/06/14 22:15:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -644,8 +644,7 @@ ProcSleep(LockMethod lockMethodTable,
lockmode,
lock,
proclock,
MyProc,
NULL) == STATUS_OK)
MyProc) == STATUS_OK)
{
/* Skip the wait and just grant myself the lock. */
GrantLock(lock, proclock, lockmode);
......@@ -846,8 +845,7 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
lockmode,
lock,
proc->waitProcLock,
proc,
NULL) == STATUS_OK)
proc) == STATUS_OK)
{
/* OK to waken */
GrantLock(lock, proc->waitProcLock, lockmode);
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/storage/lmgr.h,v 1.48 2005/04/30 19:03:33 tgl Exp $
* $PostgreSQL: pgsql/src/include/storage/lmgr.h,v 1.49 2005/06/14 22:15:33 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -49,7 +49,8 @@ extern void LockRelation(Relation relation, LOCKMODE lockmode);
extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
extern void LockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
extern void LockRelationForSession(LockRelId *relid, bool istemprel,
LOCKMODE lockmode);
extern void UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
/* Lock a relation for extension */
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.87 2005/05/29 22:45:02 tgl Exp $
* $PostgreSQL: pgsql/src/include/storage/lock.h,v 1.88 2005/06/14 22:15:33 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -212,6 +212,10 @@ typedef struct LOCKTAG
* nRequested -- total requested locks of all types.
* granted -- count of each lock type currently granted on the lock.
* nGranted -- total granted locks of all types.
*
* Note: these counts count 1 for each backend. Internally to a backend,
* there may be multiple grabs on a particular lock, but this is not reflected
* into shared memory.
*/
typedef struct LOCK
{
......@@ -235,7 +239,7 @@ typedef struct LOCK
/*
* We may have several different transactions holding or awaiting locks
* We may have several different backends holding or awaiting locks
* on the same lockable object. We need to store some per-holder/waiter
* information for each such holder (or would-be holder). This is kept in
* a PROCLOCK struct.
......@@ -244,14 +248,11 @@ typedef struct LOCK
* proclock hashtable. A PROCLOCKTAG value uniquely identifies the combination
* of a lockable object and a holder/waiter for that object.
*
* There are two possible kinds of proclock owners: a transaction (identified
* both by the PGPROC of the backend running it, and the xact's own ID) and
* a session (identified by backend PGPROC, with XID = InvalidTransactionId).
*
* Currently, session proclocks are used for user locks and for cross-xact
* locks obtained for VACUUM. Note that a single backend can hold locks
* under several different XIDs at once (including session locks). We treat
* such locks as never conflicting (a backend can never block itself).
* Internally to a backend, it is possible for the same lock to be held
* for different purposes: the backend tracks transaction locks separately
* from session locks. However, this is not reflected in the shared-memory
* state: we only track which backend(s) hold the lock. This is OK since a
* backend can never block itself.
*
* The holdMask field shows the already-granted locks represented by this
* proclock. Note that there will be a proclock object, possibly with
......@@ -259,6 +260,10 @@ typedef struct LOCK
* Otherwise, proclock objects whose holdMasks are zero are recycled
* as soon as convenient.
*
* releaseMask is workspace for LockReleaseAll(): it shows the locks due
* to be released during the current call. This must only be examined or
* set by the backend owning the PROCLOCK.
*
* Each PROCLOCK object is linked into lists for both the associated LOCK
* object and the owning PGPROC object. Note that the PROCLOCK is entered
* into these lists as soon as it is created, even if no lock has yet been
......@@ -269,7 +274,6 @@ typedef struct PROCLOCKTAG
{
SHMEM_OFFSET lock; /* link to per-lockable-object information */
SHMEM_OFFSET proc; /* link to PGPROC of owning backend */
TransactionId xid; /* xact ID, or InvalidTransactionId */
} PROCLOCKTAG;
typedef struct PROCLOCK
......@@ -279,9 +283,9 @@ typedef struct PROCLOCK
/* data */
LOCKMASK holdMask; /* bitmask for lock types currently held */
SHM_QUEUE lockLink; /* list link for lock's list of proclocks */
SHM_QUEUE procLink; /* list link for process's list of
* proclocks */
LOCKMASK releaseMask; /* bitmask for lock types to be released */
SHM_QUEUE lockLink; /* list link in LOCK's list of proclocks */
SHM_QUEUE procLink; /* list link in PGPROC's list of proclocks */
} PROCLOCK;
#define PROCLOCK_LOCKMETHOD(proclock) \
......@@ -299,15 +303,16 @@ typedef struct PROCLOCK
typedef struct LOCALLOCKTAG
{
LOCKTAG lock; /* identifies the lockable object */
TransactionId xid; /* xact ID, or InvalidTransactionId */
LOCKMODE mode; /* lock mode for this table entry */
} LOCALLOCKTAG;
typedef struct LOCALLOCKOWNER
{
/*
* Note: owner can be NULL to indicate a non-transactional lock. Must
* use a forward struct reference to avoid circularity.
* Note: if owner is NULL then the lock is held on behalf of the session;
* otherwise it is held on behalf of my current transaction.
*
* Must use a forward struct reference to avoid circularity.
*/
struct ResourceOwnerData *owner;
int nLocks; /* # of times held by this owner */
......@@ -321,6 +326,7 @@ typedef struct LOCALLOCK
/* data */
LOCK *lock; /* associated LOCK object in shared mem */
PROCLOCK *proclock; /* associated PROCLOCK object in shmem */
bool isTempObject; /* true if lock is on a temporary object */
int nLocks; /* total number of times lock is held */
int numLockOwners; /* # of relevant ResourceOwners */
int maxLockOwners; /* allocated size of array */
......@@ -366,17 +372,20 @@ extern LOCKMETHODID LockMethodTableInit(const char *tabName,
const LOCKMASK *conflictsP,
int numModes, int maxBackends);
extern LOCKMETHODID LockMethodTableRename(LOCKMETHODID lockmethodid);
extern LockAcquireResult LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode, bool dontWait);
extern LockAcquireResult LockAcquire(LOCKMETHODID lockmethodid,
LOCKTAG *locktag,
bool isTempObject,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait);
extern bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode);
extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids);
LOCKMODE lockmode, bool sessionLock);
extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);
extern void LockReleaseCurrentOwner(void);
extern void LockReassignCurrentOwner(void);
extern int LockCheckConflicts(LockMethod lockMethodTable,
LOCKMODE lockmode,
LOCK *lock, PROCLOCK *proclock, PGPROC *proc,
int *myHolding);
LOCK *lock, PROCLOCK *proclock, PGPROC *proc);
extern void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode);
extern void GrantAwaitedLock(void);
extern void RemoveFromWaitQueue(PGPROC *proc);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment