Commit 6cc842ab authored by Tom Lane's avatar Tom Lane

Revise lock manager to support "session level" locks as well as "transaction

level" locks.  A session lock is not released at transaction commit (but it
is released on transaction abort, to ensure recovery after an elog(ERROR)).
In VACUUM, use a session lock to protect the master table while vacuuming a
TOAST table, so that the TOAST table can be done in an independent
transaction.

I also took this opportunity to do some cleanup and renaming in the lock
code.  The previously noted bug in ProcLockWakeup, that it couldn't wake up
any waiters beyond the first non-wakeable waiter, is now fixed.  Also found
a previously unknown bug of the same kind (failure to scan all members of
a lock queue in some cases) in DeadLockCheck.  This might have led to failure
to detect a deadlock condition, resulting in indefinite waits, but it's
difficult to characterize the conditions required to trigger a failure.
parent b2145e93
......@@ -33,7 +33,7 @@ user_lock(uint32 id1, uint32 id2, LOCKMODE lockmode)
tag.objId.blkno = (BlockNumber) id2;
tag.offnum = (OffsetNumber) (id1 & 0xffff);
return LockAcquire(USER_LOCKMETHOD, &tag, lockmode);
return LockAcquire(USER_LOCKMETHOD, &tag, InvalidTransactionId, lockmode);
}
int
......@@ -47,7 +47,7 @@ user_unlock(uint32 id1, uint32 id2, LOCKMODE lockmode)
tag.objId.blkno = (BlockNumber) id2;
tag.offnum = (OffsetNumber) (id1 & 0xffff);
return LockRelease(USER_LOCKMETHOD, &tag, lockmode);
return LockRelease(USER_LOCKMETHOD, &tag, InvalidTransactionId, lockmode);
}
int
......@@ -89,7 +89,7 @@ user_unlock_all()
}
proc = (PROC *) MAKE_PTR(location);
return LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
return LockReleaseAll(USER_LOCKMETHOD, proc, false, InvalidTransactionId);
}
/* end of file */
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.89 2000/12/18 00:44:45 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.90 2000/12/22 00:51:53 tgl Exp $
*
* NOTES
* Transaction aborts can now occur two ways:
......@@ -741,7 +741,7 @@ AtCommit_Locks(void)
* Then you're up a creek! -mer 5/24/92
* ----------------
*/
ProcReleaseLocks();
ProcReleaseLocks(true);
}
/* --------------------------------
......@@ -828,7 +828,7 @@ AtAbort_Locks(void)
* Then you're up a creek without a paddle! -mer
* ----------------
*/
ProcReleaseLocks();
ProcReleaseLocks(false);
}
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.177 2000/12/08 06:43:44 inoue Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.178 2000/12/22 00:51:53 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -61,7 +61,7 @@ static void vacuum_init(void);
static void vacuum_shutdown(void);
static void vac_vacuum(NameData *VacRelP, bool analyze, List *anal_cols2);
static VRelList getrels(NameData *VacRelP);
static void vacuum_rel(Oid relid, bool is_toastrel);
static void vacuum_rel(Oid relid);
static void scan_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages);
static void repair_frag(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages, VacPageList fraged_pages, int nindices, Relation *Irel);
static void vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacpagelist);
......@@ -239,7 +239,7 @@ vac_vacuum(NameData *VacRelP, bool analyze, List *anal_cols2)
/* vacuum each heap relation */
for (cur = vrl; cur != (VRelList) NULL; cur = cur->vrl_next)
{
vacuum_rel(cur->vrl_relid, false);
vacuum_rel(cur->vrl_relid);
/* analyze separately so locking is minimized */
if (analyze)
analyze_rel(cur->vrl_relid, anal_cols2, MESSAGE_LEVEL);
......@@ -308,7 +308,7 @@ getrels(NameData *VacRelP)
if (rkind != RELKIND_RELATION)
{
elog(NOTICE, "Vacuum: can not process indecies, views and certain system tables");
elog(NOTICE, "Vacuum: can not process indices, views and certain system tables");
continue;
}
......@@ -342,23 +342,25 @@ getrels(NameData *VacRelP)
* vacuum_rel() -- vacuum one heap relation
*
* This routine vacuums a single heap, cleans out its indices, and
* updates its statistics num_pages and num_tuples statistics.
* updates its num_pages and num_tuples statistics.
*
* Doing one heap at a time incurs extra overhead, since we need to
* check that the heap exists again just before we vacuum it. The
* reason that we do this is so that vacuuming can be spread across
* many small transactions. Otherwise, two-phase locking would require
* us to lock the entire database during one pass of the vacuum cleaner.
*
* At entry and exit, we are not inside a transaction.
*/
static void
vacuum_rel(Oid relid, bool is_toastrel)
vacuum_rel(Oid relid)
{
Relation onerel;
LockRelId onerelid;
VacPageListData vacuum_pages; /* List of pages to vacuum and/or clean
* indices */
* indices */
VacPageListData fraged_pages; /* List of pages with space enough for
* re-using */
VacPage *vacpage;
* re-using */
Relation *Irel;
int32 nindices,
i;
......@@ -366,8 +368,8 @@ vacuum_rel(Oid relid, bool is_toastrel)
bool reindex = false;
Oid toast_relid;
if (!is_toastrel)
StartTransactionCommand();
/* Begin a transaction for vacuuming this relation */
StartTransactionCommand();
/*
* Check for user-requested abort. Note we want this to be inside a
......@@ -384,8 +386,7 @@ vacuum_rel(Oid relid, bool is_toastrel)
ObjectIdGetDatum(relid),
0, 0, 0))
{
if (!is_toastrel)
CommitTransactionCommand();
CommitTransactionCommand();
return;
}
......@@ -403,13 +404,25 @@ vacuum_rel(Oid relid, bool is_toastrel)
elog(NOTICE, "Skipping \"%s\" --- only table owner can VACUUM it",
RelationGetRelationName(onerel));
heap_close(onerel, AccessExclusiveLock);
if (!is_toastrel)
CommitTransactionCommand();
CommitTransactionCommand();
return;
}
/*
* Remember the relation'ss TOAST relation for later
* Get a session-level exclusive lock too. This will protect our
* exclusive access to the relation across multiple transactions,
* so that we can vacuum the relation's TOAST table (if any) secure
* in the knowledge that no one is diddling the parent relation.
*
* NOTE: this cannot block, even if someone else is waiting for access,
* because the lock manager knows that both lock requests are from the
* same process.
*/
onerelid = onerel->rd_lockInfo.lockRelId;
LockRelationForSession(&onerelid, AccessExclusiveLock);
/*
* Remember the relation's TOAST relation for later
*/
toast_relid = onerel->rd_rel->reltoastrelid;
......@@ -500,21 +513,6 @@ vacuum_rel(Oid relid, bool is_toastrel)
if (reindex)
activate_indexes_of_a_table(relid, true);
/*
* ok - free vacuum_pages list of reaped pages
*
* Isn't this a waste of code? Upcoming commit should free memory, no?
*/
if (vacuum_pages.num_pages > 0)
{
vacpage = vacuum_pages.pagedesc;
for (i = 0; i < vacuum_pages.num_pages; i++, vacpage++)
pfree(*vacpage);
pfree(vacuum_pages.pagedesc);
if (fraged_pages.num_pages > 0)
pfree(fraged_pages.pagedesc);
}
/* all done with this class, but hold lock until commit */
heap_close(onerel, NoLock);
......@@ -523,19 +521,25 @@ vacuum_rel(Oid relid, bool is_toastrel)
vacrelstats->num_tuples, vacrelstats->hasindex,
vacrelstats);
/*
* Complete the transaction and free all temporary memory used.
*/
CommitTransactionCommand();
/*
* If the relation has a secondary toast one, vacuum that too
* while we still hold the lock on the master table. We don't
* need to propagate "analyze" to it, because the toaster
* while we still hold the session lock on the master table.
* We don't need to propagate "analyze" to it, because the toaster
* always uses hardcoded index access and statistics are
* totally unimportant for toast relations
*/
if (toast_relid != InvalidOid)
vacuum_rel(toast_relid, true);
vacuum_rel(toast_relid);
/* next command frees attribute stats */
if (!is_toastrel)
CommitTransactionCommand();
/*
* Now release the session-level lock on the master table.
*/
UnlockRelationForSession(&onerelid, AccessExclusiveLock);
}
/*
......@@ -1786,9 +1790,13 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
if (num_moved > 0)
{
/*
* We have to commit our tuple' movings before we'll truncate
* relation, but we shouldn't lose our locks. And so - quick hack:
* record status of current transaction as committed, and continue.
* We have to commit our tuple movings before we truncate the
* relation. Ideally we should do Commit/StartTransactionCommand
* here, relying on the session-level table lock to protect our
* exclusive access to the relation. However, that would require
* a lot of extra code to close and re-open the relation, indices,
* etc. For now, a quick hack: record status of current transaction
* as committed, and continue.
*/
RecordTransactionCommit();
}
......@@ -1852,7 +1860,7 @@ failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)"
/*
* Reflect the motion of system tuples to catalog cache here.
*/
CommandCounterIncrement();
CommandCounterIncrement();
if (Nvacpagelist.num_pages > 0)
{
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/ipci.c,v 1.37 2000/12/03 17:18:10 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/ipci.c,v 1.38 2000/12/22 00:51:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -84,7 +84,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int maxBackends)
* Set up lock manager
*/
InitLocks();
if (InitLockTable() == INVALID_TABLEID)
if (InitLockTable(maxBackends) == INVALID_TABLEID)
elog(FATAL, "Couldn't create the lock table");
/*
......
$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.3 1998/07/06 18:16:07 momjian Exp $
$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.4 2000/12/22 00:51:54 tgl Exp $
There are two fundemental lock structures. Lock methods describe the
locking behavior. We currently only support multi-level locking. Lock
modes describe the mode of the lock(read/write or shared/exclusive).
There are two fundamental lock structures: the per-lockable-object LOCK
struct, and the per-lock-holder HOLDER struct. A LOCK object exists
for each lockable object that currently has locks held or requested on it.
A HOLDER struct exists for each transaction that is holding or requesting
lock(s) on each LOCK object.
Lock methods describe the overall locking behavior. Currently there are
two lock methods: DEFAULT and USER. (USER locks are non-blocking.)
Lock modes describe the type of the lock (read/write or shared/exclusive).
See src/tools/backend/index.html and src/include/storage/lock.h for more
details.
......@@ -12,10 +19,10 @@ The lock manager's LOCK:
tag -
The key fields that are used for hashing locks in the shared memory
lock hash table. This is kept as a separate struct to ensure that we
always zero out the correct number of bytes. This is a problem as
part of the tag is an itempointer which is 6 bytes and causes 2
additional bytes to be added as padding.
lock hash table. This is declared as a separate struct to ensure that
we always zero out the correct number of bytes. It is critical that
any alignment-padding bytes the compiler might insert in the struct
be zeroed out, else the hash computation will be random.
tag.relId -
Uniquely identifies the relation that the lock corresponds to.
......@@ -30,7 +37,7 @@ tag -
tuple within the block. If we are setting a table level lock
both the blockId and tupleId (in an item pointer this is called
the position) are set to invalid, if it is a page level lock the
blockId is valid, while the tuleId is still invalid. Finally if
blockId is valid, while the tupleId is still invalid. Finally if
this is a tuple level lock (we currently never do this) then both
the blockId and tupleId are set to valid specifications. This is
how we get the appearance of a multi-level lock table while using
......@@ -38,9 +45,9 @@ tag -
you are puzzled about how multi-level lock tables work).
mask -
This field indicates what types of locks are currently held in the
given lock. It is used (against the lock table's conflict table)
to determine if the new lock request will conflict with existing
This field indicates what types of locks are currently held on the
given lockable object. It is used (against the lock table's conflict
table) to determine if the new lock request will conflict with existing
lock types held. Conficts are determined by bitwise AND operations
between the mask and the conflict table entry for the given lock type
to be set. The current representation is that each bit (1 through 5)
......@@ -73,7 +80,7 @@ holders -
nActive -
Keeps a count of how many times this lock has been succesfully acquired.
This count does not include attempts that were rejected due to conflicts,
This count does not include attempts that are waiting due to conflicts,
but can count the same backend twice (e.g. a read then a write -- since
its the same transaction this won't cause a conflict)
......@@ -85,3 +92,39 @@ activeHolders -
---------------------------------------------------------------------------
The lock manager's HOLDER:
tag -
The key fields that are used for hashing entries in the shared memory
holder hash table. This is declared as a separate struct to ensure that
we always zero out the correct number of bytes.
tag.lock
SHMEM offset of the LOCK object this holder is for.
tag.pid
PID of backend process that owns this holder.
tag.xid
XID of transaction this holder is for, or InvalidTransactionId
if the holder is for session-level locking.
Note that this structure will support multiple transactions running
concurrently in one backend, which may be handy if we someday decide
to support nested transactions. Currently, the XID field is only needed
to distinguish per-transaction locks from session locks. User locks
are always session locks, and we also use session locks for multi-
transaction operations like VACUUM.
holders -
The number of successfully acquired locks of each type for this holder.
(CAUTION: the semantics are not the same as the LOCK's holder[], which
counts both acquired and pending requests. Probably a different name
should be used...)
nHolding -
Sum of the holders[] array.
queue -
List link for shared memory queue of all the HOLDER objects for the
same backend.
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.42 2000/11/30 01:39:08 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.43 2000/12/22 00:51:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -16,6 +16,7 @@
#include "postgres.h"
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "miscadmin.h"
#include "storage/lmgr.h"
......@@ -72,16 +73,17 @@ LOCKMETHOD LongTermTableId = (LOCKMETHOD) NULL;
* Create the lock table described by LockConflicts and LockPrios.
*/
LOCKMETHOD
InitLockTable()
InitLockTable(int maxBackends)
{
int lockmethod;
lockmethod = LockMethodTableInit("LockTable",
LockConflicts, LockPrios, MAX_LOCKMODES - 1);
LockConflicts, LockPrios,
MAX_LOCKMODES - 1, maxBackends);
LockTableId = lockmethod;
if (!(LockTableId))
elog(ERROR, "InitLockTable: couldnt initialize lock table");
elog(ERROR, "InitLockTable: couldn't initialize lock table");
#ifdef USER_LOCKS
......@@ -90,10 +92,7 @@ InitLockTable()
*/
LongTermTableId = LockMethodTableRename(LockTableId);
if (!(LongTermTableId))
{
elog(ERROR,
"InitLockTable: couldn't rename long-term lock table");
}
elog(ERROR, "InitLockTable: couldn't rename long-term lock table");
#endif
return LockTableId;
......@@ -139,7 +138,7 @@ LockRelation(Relation relation, LOCKMODE lockmode)
tag.dbId = relation->rd_lockInfo.lockRelId.dbId;
tag.objId.blkno = InvalidBlockNumber;
if (!LockAcquire(LockTableId, &tag, lockmode))
if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), lockmode))
elog(ERROR, "LockRelation: LockAcquire failed");
/*
......@@ -169,7 +168,55 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
tag.dbId = relation->rd_lockInfo.lockRelId.dbId;
tag.objId.blkno = InvalidBlockNumber;
LockRelease(LockTableId, &tag, lockmode);
LockRelease(LockTableId, &tag, GetCurrentTransactionId(), lockmode);
}
/*
* LockRelationForSession
*
* This routine grabs a session-level lock on the target relation. The
* session lock persists across transaction boundaries. It will be removed
* when UnlockRelationForSession() is called, or if an elog(ERROR) occurs,
* or if the backend exits.
*
* Note that one should also grab a transaction-level lock on the rel
* in any transaction that actually uses the rel, to ensure that the
* relcache entry is up to date.
*/
void
LockRelationForSession(LockRelId *relid, LOCKMODE lockmode)
{
LOCKTAG tag;
if (LockingDisabled())
return;
MemSet(&tag, 0, sizeof(tag));
tag.relId = relid->relId;
tag.dbId = relid->dbId;
tag.objId.blkno = InvalidBlockNumber;
if (!LockAcquire(LockTableId, &tag, InvalidTransactionId, lockmode))
elog(ERROR, "LockRelationForSession: LockAcquire failed");
}
/*
* UnlockRelationForSession
*/
void
UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode)
{
LOCKTAG tag;
if (LockingDisabled())
return;
MemSet(&tag, 0, sizeof(tag));
tag.relId = relid->relId;
tag.dbId = relid->dbId;
tag.objId.blkno = InvalidBlockNumber;
LockRelease(LockTableId, &tag, InvalidTransactionId, lockmode);
}
/*
......@@ -188,7 +235,7 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
tag.dbId = relation->rd_lockInfo.lockRelId.dbId;
tag.objId.blkno = blkno;
if (!LockAcquire(LockTableId, &tag, lockmode))
if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), lockmode))
elog(ERROR, "LockPage: LockAcquire failed");
}
......@@ -208,7 +255,7 @@ UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
tag.dbId = relation->rd_lockInfo.lockRelId.dbId;
tag.objId.blkno = blkno;
LockRelease(LockTableId, &tag, lockmode);
LockRelease(LockTableId, &tag, GetCurrentTransactionId(), lockmode);
}
void
......@@ -221,10 +268,10 @@ XactLockTableInsert(TransactionId xid)
MemSet(&tag, 0, sizeof(tag));
tag.relId = XactLockTableId;
tag.dbId = InvalidOid;
tag.dbId = InvalidOid; /* xids are globally unique */
tag.objId.xid = xid;
if (!LockAcquire(LockTableId, &tag, ExclusiveLock))
if (!LockAcquire(LockTableId, &tag, xid, ExclusiveLock))
elog(ERROR, "XactLockTableInsert: LockAcquire failed");
}
......@@ -242,7 +289,7 @@ XactLockTableDelete(TransactionId xid)
tag.dbId = InvalidOid;
tag.objId.xid = xid;
LockRelease(LockTableId, &tag, ExclusiveLock);
LockRelease(LockTableId, &tag, xid, ExclusiveLock);
}
#endif
......@@ -259,10 +306,10 @@ XactLockTableWait(TransactionId xid)
tag.dbId = InvalidOid;
tag.objId.xid = xid;
if (!LockAcquire(LockTableId, &tag, ShareLock))
if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), ShareLock))
elog(ERROR, "XactLockTableWait: LockAcquire failed");
LockRelease(LockTableId, &tag, ShareLock);
LockRelease(LockTableId, &tag, GetCurrentTransactionId(), ShareLock);
/*
* Transaction was committed/aborted/crashed - we have to update
......
/*-------------------------------------------------------------------------
*
* lock.c
* simple lock acquisition
* POSTGRES low-level lock mechanism
*
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.73 2000/11/28 23:27:56 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.74 2000/12/22 00:51:54 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
* locks. A lock table is a shared memory hash table. When
* a process tries to acquire a lock of a type that conflictRs
* a process tries to acquire a lock of a type that conflicts
* with existing locks, it is put to sleep using the routines
* in storage/lmgr/proc.c.
*
* For the most part, this code should be invoked via lmgr.c
* or another lock-management module, not directly.
*
* Interface:
*
* LockAcquire(), LockRelease(), LockMethodTableInit(),
* LockMethodTableRename(), LockReleaseAll,
* LockResolveConflicts(), GrantLock()
*
* NOTE: This module is used to define new lock tables. The
* multi-level lock table (multi.c) used by the heap
* access methods calls these routines. See multi.c for
* examples showing how to use this interface.
*
*-------------------------------------------------------------------------
*/
#include <sys/types.h>
......@@ -42,7 +40,11 @@
#include "utils/memutils.h"
#include "utils/ps_status.h"
static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
LOCK *lock, HOLDER *holder);
static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
int *myHolders);
static int LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
static char *lock_types[] =
{
......@@ -115,26 +117,27 @@ LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type)
inline static void
XID_PRINT(const char * where, const XIDLookupEnt * xidentP)
HOLDER_PRINT(const char * where, const HOLDER * holderP)
{
if (
(((XIDENT_LOCKMETHOD(*xidentP) == DEFAULT_LOCKMETHOD && Trace_locks)
|| (XIDENT_LOCKMETHOD(*xidentP) == USER_LOCKMETHOD && Trace_userlocks))
&& (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId >= Trace_lock_oidmin))
|| (Trace_lock_table && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId == Trace_lock_table))
(((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
|| (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
&& (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= Trace_lock_oidmin))
|| (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
)
elog(DEBUG,
"%s: xid(%lx) lock(%lx) tbl(%d) pid(%d) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
where, MAKE_OFFSET(xidentP), xidentP->tag.lock, XIDENT_LOCKMETHOD(*(xidentP)),
xidentP->tag.pid, xidentP->tag.xid,
xidentP->holders[1], xidentP->holders[2], xidentP->holders[3], xidentP->holders[4],
xidentP->holders[5], xidentP->holders[6], xidentP->holders[7], xidentP->nHolding);
"%s: holder(%lx) lock(%lx) tbl(%d) pid(%d) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
where, MAKE_OFFSET(holderP), holderP->tag.lock,
HOLDER_LOCKMETHOD(*(holderP)),
holderP->tag.pid, holderP->tag.xid,
holderP->holders[1], holderP->holders[2], holderP->holders[3], holderP->holders[4],
holderP->holders[5], holderP->holders[6], holderP->holders[7], holderP->nHolding);
}
#else /* not LOCK_DEBUG */
#define LOCK_PRINT(where, lock, type)
#define XID_PRINT(where, xidentP)
#define HOLDER_PRINT(where, holderP)
#endif /* not LOCK_DEBUG */
......@@ -149,7 +152,7 @@ static LOCKMASK BITS_OFF[MAX_LOCKMODES];
static LOCKMASK BITS_ON[MAX_LOCKMODES];
/* -----------------
* XXX Want to move this to this file
* Disable flag
* -----------------
*/
static bool LockingIsDisabled;
......@@ -168,16 +171,12 @@ static int NumLockMethods;
* -------------------
*/
void
InitLocks()
InitLocks(void)
{
int i;
int bit;
bit = 1;
/* -------------------
* remember 0th lockmode is invalid
* -------------------
*/
for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
{
BITS_ON[i] = bit;
......@@ -190,11 +189,21 @@ InitLocks()
* ------------------
*/
void
LockDisable(int status)
LockDisable(bool status)
{
LockingIsDisabled = status;
}
/* -----------------
* Boolean function to determine current locking status
* -----------------
*/
bool
LockingDisabled(void)
{
return LockingIsDisabled;
}
/*
* LockMethodInit -- initialize the lock table's lock type
......@@ -238,14 +247,16 @@ LOCKMETHOD
LockMethodTableInit(char *tabName,
LOCKMASK *conflictsP,
int *prioP,
int numModes)
int numModes,
int maxBackends)
{
LOCKMETHODTABLE *lockMethodTable;
char *shmemName;
HASHCTL info;
int hash_flags;
bool found;
int status = TRUE;
long init_table_size,
max_table_size;
if (numModes > MAX_LOCKMODES)
{
......@@ -254,6 +265,10 @@ LockMethodTableInit(char *tabName,
return INVALID_LOCKMETHOD;
}
/* Compute init/max size to request for lock hashtables */
max_table_size = NLOCKENTS(maxBackends);
init_table_size = max_table_size / 10;
/* Allocate a string for the shmem index table lookups. */
/* This is just temp space in this routine, so palloc is OK. */
shmemName = (char *) palloc(strlen(tabName) + 32);
......@@ -278,10 +293,7 @@ LockMethodTableInit(char *tabName,
ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found);
if (!lockMethodTable->ctl)
{
elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
status = FALSE;
}
/* -------------------
* no zero-th table
......@@ -301,7 +313,7 @@ LockMethodTableInit(char *tabName,
}
/* --------------------
* other modules refer to the lock table by a lockmethod
* other modules refer to the lock table by a lockmethod ID
* --------------------
*/
LockMethodTable[NumLockMethods] = lockMethodTable;
......@@ -309,8 +321,8 @@ LockMethodTableInit(char *tabName,
Assert(NumLockMethods <= MAX_LOCK_METHODS);
/* ----------------------
* allocate a hash table for the lock tags. This is used
* to find the different locks.
* allocate a hash table for LOCK structs. This is used
* to store per-locked-object information.
* ----------------------
*/
info.keysize = SHMEM_LOCKTAB_KEYSIZE;
......@@ -319,37 +331,35 @@ LockMethodTableInit(char *tabName,
hash_flags = (HASH_ELEM | HASH_FUNCTION);
sprintf(shmemName, "%s (lock hash)", tabName);
lockMethodTable->lockHash = (HTAB *) ShmemInitHash(shmemName,
INIT_TABLE_SIZE, MAX_TABLE_SIZE,
&info, hash_flags);
lockMethodTable->lockHash = ShmemInitHash(shmemName,
init_table_size,
max_table_size,
&info,
hash_flags);
Assert(lockMethodTable->lockHash->hash == tag_hash);
if (!lockMethodTable->lockHash)
{
elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
status = FALSE;
}
Assert(lockMethodTable->lockHash->hash == tag_hash);
/* -------------------------
* allocate an xid table. When different transactions hold
* the same lock, additional information must be saved (locks per tx).
* allocate a hash table for HOLDER structs. This is used
* to store per-lock-holder information.
* -------------------------
*/
info.keysize = SHMEM_XIDTAB_KEYSIZE;
info.datasize = SHMEM_XIDTAB_DATASIZE;
info.keysize = SHMEM_HOLDERTAB_KEYSIZE;
info.datasize = SHMEM_HOLDERTAB_DATASIZE;
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
sprintf(shmemName, "%s (xid hash)", tabName);
lockMethodTable->xidHash = (HTAB *) ShmemInitHash(shmemName,
INIT_TABLE_SIZE, MAX_TABLE_SIZE,
&info, hash_flags);
sprintf(shmemName, "%s (holder hash)", tabName);
lockMethodTable->holderHash = ShmemInitHash(shmemName,
init_table_size,
max_table_size,
&info,
hash_flags);
if (!lockMethodTable->xidHash)
{
if (!lockMethodTable->holderHash)
elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
status = FALSE;
}
/* init ctl data structures */
LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
......@@ -358,14 +368,11 @@ LockMethodTableInit(char *tabName,
pfree(shmemName);
if (status)
return lockMethodTable->ctl->lockmethod;
else
return INVALID_LOCKMETHOD;
return lockMethodTable->ctl->lockmethod;
}
/*
* LockMethodTableRename -- allocate another lockmethod to the same
* LockMethodTableRename -- allocate another lockmethod ID to the same
* lock table.
*
* NOTES: Both the lock module and the lock chain (lchain.c)
......@@ -374,7 +381,7 @@ LockMethodTableInit(char *tabName,
* the same to the lock table, but are handled differently
* by the lock chain manager. This function allows the
* client to use different lockmethods when acquiring/releasing
* short term and long term locks.
* short term and long term locks, yet store them all in one hashtable.
*/
LOCKMETHOD
......@@ -387,7 +394,7 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
return INVALID_LOCKMETHOD;
/* other modules refer to the lock table by a lockmethod */
/* other modules refer to the lock table by a lockmethod ID */
newLockMethod = NumLockMethods;
NumLockMethods++;
......@@ -405,7 +412,6 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
* a lock acquisition other than aborting the transaction.
* Lock is recorded in the lkchain.
*
#ifdef USER_LOCKS
*
* Note on User Locks:
*
......@@ -432,41 +438,46 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
* automatically when a backend terminates.
* They are indicated by a lockmethod 2 which is an alias for the
* normal lock table, and are distinguished from normal locks
* for the following differences:
* by the following differences:
*
* normal lock user lock
*
* lockmethod 1 2
* tag.relId rel oid 0
* tag.ItemPointerData.ip_blkid block id lock id2
* tag.ItemPointerData.ip_posid tuple offset lock id1
* xid.pid 0 backend pid
* tag.dbId database oid database oid
* tag.relId rel oid or 0 0
* tag.objId block id lock id2
* or xact id
* tag.offnum 0 lock id1
* xid.pid backend pid backend pid
* xid.xid xid or 0 0
* persistence transaction user or backend
* or backend
*
* The lockmode parameter can have the same values for normal locks
* although probably only WRITE_LOCK can have some practical use.
*
* DZ - 22 Nov 1997
#endif
*/
bool
LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode)
{
XIDLookupEnt *xident,
item;
HTAB *xidTable;
HOLDER *holder;
HOLDERTAG holdertag;
HTAB *holderTable;
bool found;
LOCK *lock = NULL;
LOCK *lock;
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
int status;
TransactionId xid;
int myHolders[MAX_LOCKMODES];
int i;
#ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
elog(DEBUG, "LockAcquire: user lock [%u] %s", locktag->objId.blkno, lock_types[lockmode]);
elog(DEBUG, "LockAcquire: user lock [%u] %s",
locktag->objId.blkno, lock_types[lockmode]);
#endif
/* ???????? This must be changed when short term locks will be used */
......@@ -501,7 +512,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
}
/* --------------------
* if there was nothing else there, complete initialization
* if it's a new lock object, initialize it
* --------------------
*/
if (!found)
......@@ -512,7 +523,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
ProcQueueInit(&(lock->waitProcs));
Assert(lock->tag.objId.blkno == locktag->objId.blkno);
LOCK_PRINT("LockAcquire: new", lock, lockmode);
}
else
......@@ -524,68 +534,44 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
}
/* ------------------
* add an element to the lock queue so that we can clear the
* locks at end of transaction.
* Create the hash key for the holder table.
* ------------------
*/
xidTable = lockMethodTable->xidHash;
/* ------------------
* Zero out all of the tag bytes (this clears the padding bytes for long
* word alignment and ensures hashing consistency).
* ------------------
*/
MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */
item.tag.lock = MAKE_OFFSET(lock);
#ifdef USE_XIDTAG_LOCKMETHOD
item.tag.lockmethod = lockmethod;
#endif
#ifdef USER_LOCKS
if (lockmethod == USER_LOCKMETHOD)
{
item.tag.pid = MyProcPid;
item.tag.xid = xid = 0;
}
else
{
xid = GetCurrentTransactionId();
TransactionIdStore(xid, &item.tag.xid);
}
#else /* not USER_LOCKS */
xid = GetCurrentTransactionId();
TransactionIdStore(xid, &item.tag.xid);
#endif /* not USER_LOCKS */
MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
holdertag.lock = MAKE_OFFSET(lock);
holdertag.pid = MyProcPid;
TransactionIdStore(xid, &holdertag.xid);
/*
* Find or create an xid entry with this tag
* Find or create a holder entry with this tag
*/
xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_ENTER, &found);
if (!xident)
holderTable = lockMethodTable->holderHash;
holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
HASH_ENTER, &found);
if (!holder)
{
SpinRelease(masterLock);
elog(NOTICE, "LockAcquire: xid table corrupted");
elog(NOTICE, "LockAcquire: holder table corrupted");
return FALSE;
}
/*
* If not found initialize the new entry
* If new, initialize the new entry
*/
if (!found)
{
xident->nHolding = 0;
MemSet((char *) xident->holders, 0, sizeof(int) * MAX_LOCKMODES);
ProcAddLock(&xident->queue);
XID_PRINT("LockAcquire: new", xident);
holder->nHolding = 0;
MemSet((char *) holder->holders, 0, sizeof(int) * MAX_LOCKMODES);
ProcAddLock(&holder->queue);
HOLDER_PRINT("LockAcquire: new", holder);
}
else
{
int i;
HOLDER_PRINT("LockAcquire: found", holder);
Assert((holder->nHolding > 0) && (holder->holders[lockmode] >= 0));
Assert(holder->nHolding <= lock->nActive);
XID_PRINT("LockAcquire: found", xident);
Assert((xident->nHolding > 0) && (xident->holders[lockmode] >= 0));
Assert(xident->nHolding <= lock->nActive);
#ifdef CHECK_DEADLOCK_RISK
/*
* Issue warning if we already hold a lower-level lock on this
* object and do not hold a lock of the requested level or higher.
......@@ -593,12 +579,16 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* a deadlock if another backend were following the same code path
* at about the same time).
*
* This is not enabled by default, because it may generate log entries
* about user-level coding practices that are in fact safe in context.
* It can be enabled to help find system-level problems.
*
* XXX Doing numeric comparison on the lockmodes is a hack;
* it'd be better to use a table. For now, though, this works.
*/
for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
{
if (xident->holders[i] > 0)
if (holder->holders[i] > 0)
{
if (i >= (int) lockmode)
break; /* safe: we have a lock >= req level */
......@@ -609,12 +599,13 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
break;
}
}
#endif /* CHECK_DEADLOCK_RISK */
}
/* ----------------
* lock->nholding tells us how many processes have _tried_ to
* acquire this lock, Regardless of whether they succeeded or
* failed in doing so.
* lock->nHolding and lock->holders count the total number of holders
* either holding or waiting for the lock, so increment those immediately.
* The other counts don't increment till we get the lock.
* ----------------
*/
lock->nHolding++;
......@@ -622,18 +613,28 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
/* --------------------
* If I'm the only one holding a lock, then there
* cannot be a conflict. The same is true if we already
* hold this lock.
* If I'm the only one holding any lock on this object, then there
* cannot be a conflict. The same is true if I already hold this lock.
* --------------------
*/
if (holder->nHolding == lock->nActive || holder->holders[lockmode] != 0)
{
GrantLock(lock, holder, lockmode);
HOLDER_PRINT("LockAcquire: owning", holder);
SpinRelease(masterLock);
return TRUE;
}
/* --------------------
* If this process (under any XID) is a holder of the lock,
* then there is no conflict, either.
* --------------------
*/
if (xident->nHolding == lock->nActive || xident->holders[lockmode] != 0)
LockCountMyLocks(holder->tag.lock, MyProc, myHolders);
if (myHolders[lockmode] != 0)
{
xident->holders[lockmode]++;
xident->nHolding++;
XID_PRINT("LockAcquire: owning", xident);
Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
GrantLock(lock, lockmode);
GrantLock(lock, holder, lockmode);
HOLDER_PRINT("LockAcquire: my other XID owning", holder);
SpinRelease(masterLock);
return TRUE;
}
......@@ -643,57 +644,56 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/
if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
{
int i = 1;
/*
* If I don't hold locks or my locks don't conflict with waiters
* then force to sleep.
* If my process doesn't hold any locks that conflict with waiters
* then force to sleep, so that prior waiters get first chance.
*/
if (xident->nHolding > 0)
for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++)
{
for (; i <= lockMethodTable->ctl->numLockModes; i++)
{
if (xident->holders[i] > 0 &&
lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
break; /* conflict */
}
if (myHolders[i] > 0 &&
lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
break; /* yes, there is a conflict */
}
if (xident->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
if (i > lockMethodTable->ctl->numLockModes)
{
XID_PRINT("LockAcquire: higher priority proc waiting",
xident);
HOLDER_PRINT("LockAcquire: another proc already waiting",
holder);
status = STATUS_FOUND;
}
else
status = LockResolveConflicts(lockmethod, lock, lockmode, xid, xident);
status = LockResolveConflicts(lockmethod, lockmode,
lock, holder,
MyProc, myHolders);
}
else
status = LockResolveConflicts(lockmethod, lock, lockmode, xid, xident);
status = LockResolveConflicts(lockmethod, lockmode,
lock, holder,
MyProc, myHolders);
if (status == STATUS_OK)
GrantLock(lock, lockmode);
GrantLock(lock, holder, lockmode);
else if (status == STATUS_FOUND)
{
#ifdef USER_LOCKS
/*
* User locks are non blocking. If we can't acquire a lock we must
* remove the xid entry and return FALSE without waiting.
* remove the holder entry and return FALSE without waiting.
*/
if (lockmethod == USER_LOCKMETHOD)
{
if (!xident->nHolding)
if (holder->nHolding == 0)
{
SHMQueueDelete(&xident->queue);
xident = (XIDLookupEnt *) hash_search(xidTable,
(Pointer) xident,
HASH_REMOVE, &found);
if (!xident || !found)
elog(NOTICE, "LockAcquire: remove xid, table corrupted");
SHMQueueDelete(&holder->queue);
holder = (HOLDER *) hash_search(holderTable,
(Pointer) holder,
HASH_REMOVE, &found);
if (!holder || !found)
elog(NOTICE, "LockAcquire: remove holder, table corrupted");
}
else
XID_PRINT("LockAcquire: NHOLDING", xident);
HOLDER_PRINT("LockAcquire: NHOLDING", holder);
lock->nHolding--;
lock->holders[lockmode]--;
LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
......@@ -705,38 +705,40 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
#endif /* USER_LOCKS */
/*
* Construct bitmask of locks we hold before going to sleep.
* Construct bitmask of locks this process holds on this object.
*/
MyProc->holdLock = 0;
if (xident->nHolding > 0)
{
int i,
tmpMask = 2;
int holdLock = 0;
int tmpMask;
for (i = 1; i <= lockMethodTable->ctl->numLockModes;
for (i = 1, tmpMask = 2;
i <= lockMethodTable->ctl->numLockModes;
i++, tmpMask <<= 1)
{
if (xident->holders[i] > 0)
MyProc->holdLock |= tmpMask;
if (myHolders[i] > 0)
holdLock |= tmpMask;
}
Assert(MyProc->holdLock != 0);
MyProc->holdLock = holdLock;
}
status = WaitOnLock(lockmethod, lock, lockmode);
/*
* Sleep till someone wakes me up.
*/
status = WaitOnLock(lockmethod, lockmode, lock, holder);
/*
* Check the xid entry status, in case something in the ipc
* Check the holder entry status, in case something in the ipc
* communication doesn't work correctly.
*/
if (!((xident->nHolding > 0) && (xident->holders[lockmode] > 0)))
if (!((holder->nHolding > 0) && (holder->holders[lockmode] > 0)))
{
XID_PRINT("LockAcquire: INCONSISTENT", xident);
HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */
SpinRelease(masterLock);
return FALSE;
}
XID_PRINT("LockAcquire: granted", xident);
HOLDER_PRINT("LockAcquire: granted", holder);
LOCK_PRINT("LockAcquire: granted", lock, lockmode);
}
......@@ -755,93 +757,26 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* determining whether or not any new lock acquired conflicts with
* the old ones.
*
* The caller can optionally pass the process's total holders counts, if
* known. If NULL is passed then these values will be computed internally.
* ----------------------------
*/
int
LockResolveConflicts(LOCKMETHOD lockmethod,
LOCK *lock,
LOCKMODE lockmode,
TransactionId xid,
XIDLookupEnt *xidentP) /* xident ptr or NULL */
LOCK *lock,
HOLDER *holder,
PROC *proc,
int *myHolders) /* myHolders[] array or NULL */
{
XIDLookupEnt *xident,
item;
int *myHolders;
int numLockModes;
HTAB *xidTable;
bool found;
LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl;
int numLockModes = lockctl->numLockModes;
int bitmask;
int i,
tmpMask;
int localHolders[MAX_LOCKMODES];
numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
xidTable = LockMethodTable[lockmethod]->xidHash;
if (xidentP)
{
/*
* A pointer to the xid entry was supplied from the caller.
* Actually only LockAcquire can do it.
*/
xident = xidentP;
}
else
{
/* ---------------------
* read my own statistics from the xid table. If there
* isn't an entry, then we'll just add one.
*
* Zero out the tag, this clears the padding bytes for long
* word alignment and ensures hashing consistency.
* ------------------
*/
MemSet(&item, 0, XID_TAGSIZE);
item.tag.lock = MAKE_OFFSET(lock);
#ifdef USE_XIDTAG_LOCKMETHOD
item.tag.lockmethod = lockmethod;
#endif
#ifdef USER_LOCKS
if (lockmethod == USER_LOCKMETHOD)
{
item.tag.pid = MyProcPid;
item.tag.xid = 0;
}
else
#endif
TransactionIdStore(xid, &item.tag.xid);
/*
* Find or create an xid entry with this tag
*/
xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_ENTER, &found);
if (!xident)
{
elog(NOTICE, "LockResolveConflicts: xid table corrupted");
return STATUS_ERROR;
}
/*
* If not found initialize the new entry. THIS SHOULD NEVER
* HAPPEN, if we are trying to resolve a conflict we must already
* have allocated an xid entry for this lock. dz 21-11-1997
*/
if (!found)
{
/* ---------------
* we're not holding any type of lock yet. Clear
* the lock stats.
* ---------------
*/
MemSet(xident->holders, 0, numLockModes * sizeof(*(lock->holders)));
xident->nHolding = 0;
XID_PRINT("LockResolveConflicts: NOT FOUND", xident);
}
else
XID_PRINT("LockResolveConflicts: found", xident);
}
Assert((xident->nHolding >= 0) && (xident->holders[lockmode] >= 0));
Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0));
/* ----------------------------
* first check for global conflicts: If no locks conflict
......@@ -853,22 +788,27 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* compare tells if there is a conflict.
* ----------------------------
*/
if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
if (!(lockctl->conflictTab[lockmode] & lock->mask))
{
xident->holders[lockmode]++;
xident->nHolding++;
XID_PRINT("LockResolveConflicts: no conflict", xident);
Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
HOLDER_PRINT("LockResolveConflicts: no conflict", holder);
return STATUS_OK;
}
/* ------------------------
* Rats. Something conflicts. But it could still be my own
* lock. We have to construct a conflict mask
* that does not reflect our own locks.
* that does not reflect our own locks. Locks held by the current
* process under another XID also count as "our own locks".
* ------------------------
*/
myHolders = xident->holders;
if (myHolders == NULL)
{
/* Caller didn't do calculation of total holding for me */
LockCountMyLocks(holder->tag.lock, proc, localHolders);
myHolders = localHolders;
}
/* Compute mask of lock types held by other processes */
bitmask = 0;
tmpMask = 2;
for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
......@@ -884,26 +824,99 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* conflict and I have to sleep.
* ------------------------
*/
if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
if (!(lockctl->conflictTab[lockmode] & bitmask))
{
/* no conflict. Get the lock and go on */
xident->holders[lockmode]++;
xident->nHolding++;
XID_PRINT("LockResolveConflicts: resolved", xident);
Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
/* no conflict. OK to get the lock */
HOLDER_PRINT("LockResolveConflicts: resolved", holder);
return STATUS_OK;
}
XID_PRINT("LockResolveConflicts: conflicting", xident);
HOLDER_PRINT("LockResolveConflicts: conflicting", holder);
return STATUS_FOUND;
}
/*
* GrantLock -- update the lock data structure to show
* the new lock holder.
* LockCountMyLocks --- Count total number of locks held on a given lockable
* object by a given process (under any transaction ID).
*
* XXX This could be rather slow if the process holds a large number of locks.
* Perhaps it could be sped up if we kept yet a third hashtable of per-
* process lock information. However, for the normal case where a transaction
* doesn't hold a large number of locks, keeping such a table would probably
* be a net slowdown.
*/
static void
LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders)
{
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
SHM_QUEUE *lockQueue = &(proc->lockQueue);
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
int i;
MemSet(myHolders, 0, MAX_LOCKMODES * sizeof(int));
if (SHMQueueEmpty(lockQueue))
return;
SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
do
{
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
if (holder->queue.next == end)
nextHolder = NULL;
else
SHMQueueFirst(&holder->queue,
(Pointer *) &nextHolder, &nextHolder->queue);
if (lockOffset == holder->tag.lock)
{
for (i = 1; i < MAX_LOCKMODES; i++)
{
myHolders[i] += holder->holders[i];
}
}
holder = nextHolder;
} while (holder);
}
/*
* LockGetMyHoldLocks -- compute bitmask of lock types held by a process
* for a given lockable object.
*/
static int
LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
{
int myHolders[MAX_LOCKMODES];
int holdLock = 0;
int i,
tmpMask;
LockCountMyLocks(lockOffset, proc, myHolders);
for (i = 1, tmpMask = 2;
i < MAX_LOCKMODES;
i++, tmpMask <<= 1)
{
if (myHolders[i] > 0)
holdLock |= tmpMask;
}
return holdLock;
}
/*
* GrantLock -- update the lock and holder data structures to show
* the new lock has been granted.
*/
void
GrantLock(LOCK *lock, LOCKMODE lockmode)
GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
{
lock->nActive++;
lock->activeHolders[lockmode]++;
......@@ -911,14 +924,23 @@ GrantLock(LOCK *lock, LOCKMODE lockmode)
LOCK_PRINT("GrantLock", lock, lockmode);
Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
Assert(lock->nActive <= lock->nHolding);
holder->holders[lockmode]++;
holder->nHolding++;
Assert((holder->nHolding > 0) && (holder->holders[lockmode] > 0));
}
/*
* WaitOnLock -- wait to acquire a lock
*
* The locktable spinlock must be held at entry.
*/
static int
WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
LOCK *lock, HOLDER *holder)
{
PROC_QUEUE *waitQueue = &(lock->waitProcs);
LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
char *new_status, *old_status;
char *new_status,
*old_status;
Assert(lockmethod < NumLockMethods);
......@@ -933,20 +955,21 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
old_status = pstrdup(get_ps_display());
new_status = (char *) palloc(strlen(get_ps_display()) + 10);
strcpy(new_status, get_ps_display());
new_status = (char *) palloc(strlen(old_status) + 10);
strcpy(new_status, old_status);
strcat(new_status, " waiting");
set_ps_display(new_status);
if (ProcSleep(waitQueue,
lockMethodTable->ctl,
if (ProcSleep(lockMethodTable->ctl,
lockmode,
lock) != NO_ERROR)
lock,
holder) != NO_ERROR)
{
/* -------------------
* We failed as a result of a deadlock, see HandleDeadLock().
* Decrement the lock nHolding and holders fields as
* we are no longer waiting on this lock.
* we are no longer waiting on this lock. Removal of the holder and
* lock objects, if no longer needed, will happen in xact cleanup.
* -------------------
*/
lock->nHolding--;
......@@ -980,19 +1003,19 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
* priority waiting process, that process is granted the lock
* and awoken. (We have to grant the lock here to avoid a
* race between the waking process and any new process to
* come along and request the lock).
* come along and request the lock.)
*/
bool
LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode)
{
LOCK *lock = NULL;
LOCK *lock;
SPINLOCK masterLock;
bool found;
LOCKMETHODTABLE *lockMethodTable;
XIDLookupEnt *xident,
item;
HTAB *xidTable;
TransactionId xid;
HOLDER *holder;
HOLDERTAG holdertag;
HTAB *holderTable;
bool wakeupNeeded = true;
#ifdef LOCK_DEBUG
......@@ -1046,40 +1069,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding);
/* ------------------
* Zero out all of the tag bytes (this clears the padding bytes for long
* word alignment and ensures hashing consistency).
* ------------------
*/
MemSet(&item, 0, XID_TAGSIZE);
item.tag.lock = MAKE_OFFSET(lock);
#ifdef USE_XIDTAG_LOCKMETHOD
item.tag.lockmethod = lockmethod;
#endif
#ifdef USER_LOCKS
if (lockmethod == USER_LOCKMETHOD)
{
item.tag.pid = MyProcPid;
item.tag.xid = xid = 0;
}
else
{
xid = GetCurrentTransactionId();
TransactionIdStore(xid, &item.tag.xid);
}
#else
xid = GetCurrentTransactionId();
TransactionIdStore(xid, &item.tag.xid);
#endif
/*
* Find an xid entry with this tag
* Find the holder entry for this holder.
*/
xidTable = lockMethodTable->xidHash;
xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_FIND_SAVE, &found);
if (!xident || !found)
MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
holdertag.lock = MAKE_OFFSET(lock);
holdertag.pid = MyProcPid;
TransactionIdStore(xid, &holdertag.xid);
holderTable = lockMethodTable->holderHash;
holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
HASH_FIND_SAVE, &found);
if (!holder || !found)
{
SpinRelease(masterLock);
#ifdef USER_LOCKS
......@@ -1087,26 +1088,26 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
elog(NOTICE, "LockRelease: no lock with this tag");
else
#endif
elog(NOTICE, "LockRelease: xid table corrupted");
elog(NOTICE, "LockRelease: holder table corrupted");
return FALSE;
}
XID_PRINT("LockRelease: found", xident);
Assert(xident->tag.lock == MAKE_OFFSET(lock));
HOLDER_PRINT("LockRelease: found", holder);
Assert(holder->tag.lock == MAKE_OFFSET(lock));
/*
* Check that we are actually holding a lock of the type we want to
* release.
*/
if (!(xident->holders[lockmode] > 0))
if (!(holder->holders[lockmode] > 0))
{
SpinRelease(masterLock);
XID_PRINT("LockAcquire: WRONGTYPE", xident);
HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
elog(NOTICE, "LockRelease: you don't own a lock of type %s",
lock_types[lockmode]);
Assert(xident->holders[lockmode] >= 0);
Assert(holder->holders[lockmode] >= 0);
return FALSE;
}
Assert(xident->nHolding > 0);
Assert(holder->nHolding > 0);
/*
* fix the general lock stats
......@@ -1116,6 +1117,12 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
lock->nActive--;
lock->activeHolders[lockmode]--;
if (!(lock->activeHolders[lockmode]))
{
/* change the conflict mask. No more of this lock type. */
lock->mask &= BITS_OFF[lockmode];
}
#ifdef NOT_USED
/* --------------------------
* If there are still active locks of the type I just released, no one
......@@ -1139,12 +1146,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
wakeupNeeded = true;
if (!(lock->activeHolders[lockmode]))
{
/* change the conflict mask. No more of this lock type. */
lock->mask &= BITS_OFF[lockmode];
}
LOCK_PRINT("LockRelease: updated", lock, lockmode);
Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
......@@ -1154,7 +1155,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
{
/* ------------------
* if there's no one waiting in the queue,
* we just released the last lock.
* we just released the last lock on this object.
* Delete it from the lock table.
* ------------------
*/
......@@ -1168,39 +1169,38 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
}
/*
* now check to see if I have any private locks. If I do, decrement
* the counts associated with them.
* Now fix the per-holder lock stats.
*/
xident->holders[lockmode]--;
xident->nHolding--;
XID_PRINT("LockRelease: updated", xident);
Assert((xident->nHolding >= 0) && (xident->holders[lockmode] >= 0));
holder->holders[lockmode]--;
holder->nHolding--;
HOLDER_PRINT("LockRelease: updated", holder);
Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0));
/*
* If this was my last hold on this lock, delete my entry in the XID
* If this was my last hold on this lock, delete my entry in the holder
* table.
*/
if (!xident->nHolding)
if (!holder->nHolding)
{
if (xident->queue.prev == INVALID_OFFSET)
elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
if (xident->queue.next == INVALID_OFFSET)
elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
if (xident->queue.next != INVALID_OFFSET)
SHMQueueDelete(&xident->queue);
XID_PRINT("LockRelease: deleting", xident);
xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &xident,
HASH_REMOVE_SAVED, &found);
if (!xident || !found)
if (holder->queue.prev == INVALID_OFFSET)
elog(NOTICE, "LockRelease: holder.prev == INVALID_OFFSET");
if (holder->queue.next == INVALID_OFFSET)
elog(NOTICE, "LockRelease: holder.next == INVALID_OFFSET");
if (holder->queue.next != INVALID_OFFSET)
SHMQueueDelete(&holder->queue);
HOLDER_PRINT("LockRelease: deleting", holder);
holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
HASH_REMOVE_SAVED, &found);
if (!holder || !found)
{
SpinRelease(masterLock);
elog(NOTICE, "LockRelease: remove xid, table corrupted");
elog(NOTICE, "LockRelease: remove holder, table corrupted");
return FALSE;
}
}
if (wakeupNeeded)
ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
ProcLockWakeup(lockmethod, lock);
#ifdef LOCK_DEBUG
else if (LOCK_DEBUG_ENABLED(lock))
elog(DEBUG, "LockRelease: no wakeup needed");
......@@ -1211,16 +1211,23 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
}
/*
* LockReleaseAll -- Release all locks in a process lock queue.
* LockReleaseAll -- Release all locks in a process's lock queue.
*
* Well, not really *all* locks.
*
* If 'allxids' is TRUE, all locks of the specified lock method are
* released, regardless of transaction affiliation.
*
* If 'allxids' is FALSE, all locks of the specified lock method and
* specified XID are released.
*/
bool
LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
bool allxids, TransactionId xid)
{
PROC_QUEUE *waitQueue;
int done;
XIDLookupEnt *xidLook = NULL;
XIDLookupEnt *tmp = NULL;
XIDLookupEnt *xident;
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
SHM_QUEUE *lockQueue = &(proc->lockQueue);
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
......@@ -1228,21 +1235,19 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
numLockModes;
LOCK *lock;
bool found;
int xidtag_lockmethod,
nleft;
int nleft;
#ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d", lockmethod, MyProcPid);
elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
lockmethod, MyProcPid);
#endif
nleft = 0;
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable)
{
elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod);
elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod);
return FALSE;
}
......@@ -1253,9 +1258,12 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
masterLock = lockMethodTable->ctl->masterLock;
SpinAcquire(masterLock);
SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
for (;;)
SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
nleft = 0;
do
{
bool wakeupNeeded = false;
......@@ -1265,91 +1273,49 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
if (holder->queue.next == end)
nextHolder = NULL;
else
SHMQueueFirst(&holder->queue,
(Pointer *) &nextHolder, &nextHolder->queue);
Assert(holder->tag.pid == proc->pid);
xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook);
if (xidtag_lockmethod == lockmethod)
lock = (LOCK *) MAKE_PTR(holder->tag.lock);
/* Ignore items that are not of the lockmethod to be removed */
if (LOCK_LOCKMETHOD(*lock) != lockmethod)
{
XID_PRINT("LockReleaseAll", xidLook);
LOCK_PRINT("LockReleaseAll", lock, 0);
nleft++;
goto next_item;
}
#ifdef USE_XIDTAG_LOCKMETHOD
if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock))
elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d",
xidtag_lockmethod, lock->tag.lockmethod);
#endif
if (xidtag_lockmethod != lockmethod)
/* If not allxids, ignore items that are of the wrong xid */
if (!allxids && xid != holder->tag.xid)
{
nleft++;
goto next_item;
}
HOLDER_PRINT("LockReleaseAll", holder);
LOCK_PRINT("LockReleaseAll", lock, 0);
Assert(lock->nHolding > 0);
Assert(lock->nActive > 0);
Assert(lock->nActive <= lock->nHolding);
Assert(xidLook->nHolding >= 0);
Assert(xidLook->nHolding <= lock->nHolding);
#ifdef USER_LOCKS
if (lockmethod == USER_LOCKMETHOD)
{
if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
{
#ifdef LOCK_DEBUG
if (Trace_userlocks)
elog(DEBUG, "LockReleaseAll: skiping normal lock [%ld,%d,%d]",
xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
#endif /* LOCK_DEBUG */
nleft++;
goto next_item;
}
if (xidLook->tag.pid != MyProcPid)
{
/* Should never happen */
elog(NOTICE,
"LockReleaseAll: INVALID PID: [%u] [%ld,%d,%d]",
lock->tag.objId.blkno,
xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
nleft++;
goto next_item;
}
#ifdef LOCK_DEBUG
if (Trace_userlocks)
elog(DEBUG, "LockReleaseAll: releasing user lock [%u] [%ld,%d,%d]",
lock->tag.objId.blkno, xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
#endif /* LOCK_DEBUG */
}
else
{
/*
* Can't check xidLook->tag.xid, can be 0 also for normal locks
*/
if (xidLook->tag.pid != 0)
{
#ifdef LOCK_DEBUG
if (Trace_userlocks)
elog(DEBUG, "LockReleaseAll: skiping user lock [%u] [%ld,%d,%d]",
lock->tag.objId.blkno, xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
#endif /* LOCK_DEBUG */
nleft++;
goto next_item;
}
}
#endif /* USER_LOCKS */
Assert(holder->nHolding >= 0);
Assert(holder->nHolding <= lock->nHolding);
/* ------------------
* fix the general lock stats
* ------------------
*/
if (lock->nHolding != xidLook->nHolding)
if (lock->nHolding != holder->nHolding)
{
for (i = 1; i <= numLockModes; i++)
{
Assert(xidLook->holders[i] >= 0);
lock->holders[i] -= xidLook->holders[i];
lock->activeHolders[i] -= xidLook->holders[i];
Assert(holder->holders[i] >= 0);
lock->holders[i] -= holder->holders[i];
lock->activeHolders[i] -= holder->holders[i];
Assert((lock->holders[i] >= 0) \
&&(lock->activeHolders[i] >= 0));
if (!lock->activeHolders[i])
......@@ -1358,12 +1324,12 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
/*
* Read comments in LockRelease
*/
if (!wakeupNeeded && xidLook->holders[i] > 0 &&
if (!wakeupNeeded && holder->holders[i] > 0 &&
lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
wakeupNeeded = true;
}
lock->nHolding -= xidLook->nHolding;
lock->nActive -= xidLook->nHolding;
lock->nHolding -= holder->nHolding;
lock->nActive -= holder->nHolding;
Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
Assert(lock->nActive <= lock->nHolding);
}
......@@ -1384,25 +1350,24 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
}
LOCK_PRINT("LockReleaseAll: updated", lock, 0);
HOLDER_PRINT("LockReleaseAll: deleting", holder);
/*
* Remove the xid from the process lock queue
* Remove the holder entry from the process' lock queue
*/
SHMQueueDelete(&xidLook->queue);
SHMQueueDelete(&holder->queue);
/* ----------------
* always remove the xidLookup entry, we're done with it now
* ----------------
/*
* remove the holder entry from the hashtable
*/
XID_PRINT("LockReleaseAll: deleting", xidLook);
xident = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
(Pointer) xidLook,
HASH_REMOVE,
&found);
if (!xident || !found)
holder = (HOLDER *) hash_search(lockMethodTable->holderHash,
(Pointer) holder,
HASH_REMOVE,
&found);
if (!holder || !found)
{
SpinRelease(masterLock);
elog(NOTICE, "LockReleaseAll: xid table corrupted");
elog(NOTICE, "LockReleaseAll: holder table corrupted");
return FALSE;
}
......@@ -1426,17 +1391,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
}
}
else if (wakeupNeeded)
{
waitQueue = &(lock->waitProcs);
ProcLockWakeup(waitQueue, lockmethod, lock);
}
ProcLockWakeup(lockmethod, lock);
next_item:
if (done)
break;
SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
xidLook = tmp;
}
holder = nextHolder;
} while (holder);
/*
* Reinitialize the queue only if nothing has been left in.
......@@ -1474,10 +1433,10 @@ LockShmemSize(int maxBackends)
SHMEM_LOCKTAB_KEYSIZE,
SHMEM_LOCKTAB_DATASIZE);
/* xidHash table */
/* holderHash table */
size += hash_estimate_size(NLOCKENTS(maxBackends),
SHMEM_XIDTAB_KEYSIZE,
SHMEM_XIDTAB_DATASIZE);
SHMEM_HOLDERTAB_KEYSIZE,
SHMEM_HOLDERTAB_DATASIZE);
/*
* Since the lockHash entry count above is only an estimate, add 10%
......@@ -1488,16 +1447,6 @@ LockShmemSize(int maxBackends)
return size;
}
/* -----------------
* Boolean function to determine current locking status
* -----------------
*/
bool
LockingDisabled()
{
return LockingIsDisabled;
}
/*
* DeadlockCheck -- Checks for deadlocks for a given process
*
......@@ -1512,20 +1461,19 @@ LockingDisabled()
* We have already locked the master lock before being called.
*/
bool
DeadLockCheck(void *proc, LOCK *findlock)
DeadLockCheck(PROC *thisProc, LOCK *findlock)
{
XIDLookupEnt *xidLook = NULL;
XIDLookupEnt *tmp = NULL;
PROC *thisProc = (PROC *) proc,
*waitProc;
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
PROC *waitProc;
PROC_QUEUE *waitQueue;
SHM_QUEUE *lockQueue = &(thisProc->lockQueue);
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
LOCK *lock;
PROC_QUEUE *waitQueue;
int i,
j;
bool first_run = (thisProc == MyProc),
done;
bool first_run = (thisProc == MyProc);
static PROC *checked_procs[MAXBACKENDS];
static int nprocs;
......@@ -1533,27 +1481,40 @@ DeadLockCheck(void *proc, LOCK *findlock)
/* initialize at start of recursion */
if (first_run)
{
checked_procs[0] = MyProc;
checked_procs[0] = thisProc;
nprocs = 1;
}
if (SHMQueueEmpty(lockQueue))
return false;
SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
XID_PRINT("DeadLockCheck", xidLook);
SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
for (;;)
do
{
done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
if (holder->queue.next == end)
nextHolder = NULL;
else
SHMQueueFirst(&holder->queue,
(Pointer *) &nextHolder, &nextHolder->queue);
LOCK_PRINT("DeadLockCheck", lock, 0);
Assert(holder->tag.pid == thisProc->pid);
lock = (LOCK *) MAKE_PTR(holder->tag.lock);
if (lock->tag.relId == 0) /* user' lock */
/* Ignore user locks */
if (lock->tag.lockmethod != DEFAULT_LOCKMETHOD)
goto nxtl;
HOLDER_PRINT("DeadLockCheck", holder);
LOCK_PRINT("DeadLockCheck", lock, 0);
/*
* waitLock is always in lockQueue of waiting proc, if !first_run
* then upper caller will handle waitProcs queue of waitLock.
......@@ -1567,14 +1528,12 @@ DeadLockCheck(void *proc, LOCK *findlock)
*/
if (lock == findlock && !first_run)
{
LOCKMETHODCTL *lockctl =
LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
int lm;
Assert(xidLook->nHolding > 0);
Assert(holder->nHolding > 0);
for (lm = 1; lm <= lockctl->numLockModes; lm++)
{
if (xidLook->holders[lm] > 0 &&
if (holder->holders[lm] > 0 &&
lockctl->conflictTab[lm] & findlock->waitMask)
return true;
}
......@@ -1588,37 +1547,38 @@ DeadLockCheck(void *proc, LOCK *findlock)
waitQueue = &(lock->waitProcs);
waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
for (i = 0; i < waitQueue->size; i++)
/*
* NOTE: loop must count down because we want to examine each item
* in the queue even if waitQueue->size decreases due to waking up
* some of the processes.
*/
for (i = waitQueue->size; --i >= 0; )
{
Assert(waitProc->waitLock == lock);
if (waitProc == thisProc)
{
Assert(waitProc->waitLock == lock);
/* This should only happen at first level */
Assert(waitProc == MyProc);
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
continue;
goto nextWaitProc;
}
if (lock == findlock) /* first_run also true */
{
LOCKMETHODCTL *lockctl =
LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
/*
* If me blocked by his holdlock...
*/
if (lockctl->conflictTab[MyProc->token] & waitProc->holdLock)
if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->holdLock)
{
/* and he blocked by me -> deadlock */
if (lockctl->conflictTab[waitProc->token] & MyProc->holdLock)
if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock)
return true;
/* we shouldn't look at lockQueue of our blockers */
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
continue;
goto nextWaitProc;
}
/*
* If he isn't blocked by me and we request
* non-conflicting lock modes - no deadlock here because
* of he isn't blocked by me in any sence (explicitle or
* he isn't blocked by me in any sense (explicitly or
* implicitly). Note that we don't do like test if
* !first_run (when thisProc is holder and non-waiter on
* lock) and so we call DeadLockCheck below for every
......@@ -1626,123 +1586,108 @@ DeadLockCheck(void *proc, LOCK *findlock)
* un-blocked by thisProc. Should we? This could save us
* some time...
*/
if (!(lockctl->conflictTab[waitProc->token] & MyProc->holdLock) &&
!(lockctl->conflictTab[waitProc->token] & (1 << MyProc->token)))
{
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
continue;
}
if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock) &&
!(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode)))
goto nextWaitProc;
}
/*
* Look in lockQueue of this waitProc, if didn't do this
* before.
* Skip this waiter if already checked.
*/
for (j = 0; j < nprocs; j++)
{
if (checked_procs[j] == waitProc)
break;
goto nextWaitProc;
}
if (j >= nprocs)
/* Recursively check this process's lockQueue. */
Assert(nprocs < MAXBACKENDS);
checked_procs[nprocs++] = waitProc;
if (DeadLockCheck(waitProc, findlock))
{
Assert(nprocs < MAXBACKENDS);
checked_procs[nprocs++] = waitProc;
int holdLock;
if (DeadLockCheck(waitProc, findlock))
/*
* Ok, but is waitProc waiting for me (thisProc) ?
*/
if (thisProc->waitLock == lock)
{
Assert(first_run);
holdLock = thisProc->holdLock;
}
else
{
/* should we cache holdLock to speed this up? */
holdLock = LockGetMyHoldLocks(holder->tag.lock, thisProc);
Assert(holdLock != 0);
}
if (lockctl->conflictTab[waitProc->waitLockMode] & holdLock)
{
LOCKMETHODCTL *lockctl =
LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
int holdLock;
/*
* Ok, but is waitProc waiting for me (thisProc) ?
* Last attempt to avoid deadlock: try to wakeup myself.
*/
if (thisProc->waitLock == lock)
{
Assert(first_run);
holdLock = thisProc->holdLock;
}
else
/* should we cache holdLock ? */
{
int lm,
tmpMask = 2;
Assert(xidLook->nHolding > 0);
for (holdLock = 0, lm = 1;
lm <= lockctl->numLockModes;
lm++, tmpMask <<= 1)
{
if (xidLook->holders[lm] > 0)
holdLock |= tmpMask;
}
Assert(holdLock != 0);
}
if (lockctl->conflictTab[waitProc->token] & holdLock)
if (first_run)
{
/*
* Last attempt to avoid deadlock - try to wakeup
* myself.
*/
if (first_run)
if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
MyProc->waitLockMode,
MyProc->waitLock,
MyProc->waitHolder,
MyProc,
NULL) == STATUS_OK)
{
if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
MyProc->waitLock,
MyProc->token,
MyProc->xid,
NULL) == STATUS_OK)
{
SetWaitingForLock(false);
GrantLock(MyProc->waitLock, MyProc->token);
(MyProc->waitLock->waitProcs.size)--;
ProcWakeup(MyProc, NO_ERROR);
return false;
}
SetWaitingForLock(false);
GrantLock(MyProc->waitLock,
MyProc->waitHolder,
MyProc->waitLockMode);
ProcWakeup(MyProc, NO_ERROR);
return false;
}
return true;
}
/*
* Hell! Is he blocked by any (other) holder ?
*/
if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
lock,
waitProc->token,
waitProc->xid,
NULL) != STATUS_OK)
{
/*
* Blocked by others - no deadlock...
*/
LOCK_PRINT("DeadLockCheck: blocked by others", lock, waitProc->token);
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
continue;
}
return true;
}
/*
* Hell! Is he blocked by any (other) holder ?
*/
if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
waitProc->waitLockMode,
lock,
waitProc->waitHolder,
waitProc,
NULL) != STATUS_OK)
{
/*
* Well - wakeup this guy! This is the case of
* implicit blocking: thisProc blocked someone who
* blocked waitProc by the fact that he/someone is
* already waiting for lock. We do this for
* anti-starving.
* Blocked by others - no deadlock...
*/
GrantLock(lock, waitProc->token);
waitQueue->size--;
waitProc = ProcWakeup(waitProc, NO_ERROR);
continue;
LOCK_PRINT("DeadLockCheck: blocked by others",
lock, waitProc->waitLockMode);
goto nextWaitProc;
}
/*
* Well - wakeup this guy! This is the case of
* implicit blocking: thisProc blocked someone who
* blocked waitProc by the fact that he/someone is
* already waiting for lock. We do this for
* anti-starving.
*/
GrantLock(lock, waitProc->waitHolder, waitProc->waitLockMode);
waitProc = ProcWakeup(waitProc, NO_ERROR);
/*
* Use next-proc link returned by ProcWakeup, since this
* proc's own links field is now cleared.
*/
continue;
}
nextWaitProc:
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
}
nxtl: ;
if (done)
break;
SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
xidLook = tmp;
}
nxtl:
holder = nextHolder;
} while (holder);
/* if we got here, no deadlock */
return false;
......@@ -1754,17 +1699,15 @@ nxtl: ;
* the masterLock.
*/
void
DumpLocks()
DumpLocks(void)
{
SHMEM_OFFSET location;
PROC *proc;
SHM_QUEUE *lockQueue;
int done;
XIDLookupEnt *xidLook = NULL;
XIDLookupEnt *tmp = NULL;
HOLDER *holder = NULL;
HOLDER *nextHolder = NULL;
SHMEM_OFFSET end;
LOCK *lock;
int count = 0;
int lockmethod = DEFAULT_LOCKMETHOD;
LOCKMETHODTABLE *lockMethodTable;
......@@ -1775,64 +1718,60 @@ DumpLocks()
if (proc != MyProc)
return;
lockQueue = &proc->lockQueue;
end = MAKE_OFFSET(lockQueue);
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable)
return;
if (proc->waitLock)
LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
if (SHMQueueEmpty(lockQueue))
return;
SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
end = MAKE_OFFSET(lockQueue);
if (MyProc->waitLock)
LOCK_PRINT("DumpLocks: waiting on", MyProc->waitLock, 0);
SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
for (;;)
do
{
if (count++ > 2000)
{
elog(NOTICE, "DumpLocks: xid loop detected, giving up");
break;
}
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
if (holder->queue.next == end)
nextHolder = NULL;
else
SHMQueueFirst(&holder->queue,
(Pointer *) &nextHolder, &nextHolder->queue);
XID_PRINT("DumpLocks", xidLook);
LOCK_PRINT("DumpLocks", lock, 0);
Assert(holder->tag.pid == proc->pid);
if (done)
break;
lock = (LOCK *) MAKE_PTR(holder->tag.lock);
SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
xidLook = tmp;
}
HOLDER_PRINT("DumpLocks", holder);
LOCK_PRINT("DumpLocks", lock, 0);
holder = nextHolder;
} while (holder);
}
/*
* Dump all postgres locks. Must have already acquired the masterLock.
*/
void
DumpAllLocks()
DumpAllLocks(void)
{
SHMEM_OFFSET location;
PROC *proc;
XIDLookupEnt *xidLook = NULL;
HOLDER *holder = NULL;
LOCK *lock;
int pid;
int count = 0;
int lockmethod = DEFAULT_LOCKMETHOD;
LOCKMETHODTABLE *lockMethodTable;
HTAB *xidTable;
HTAB *holderTable;
pid = getpid();
ShmemPIDLookup(pid, &location);
......@@ -1847,30 +1786,24 @@ DumpAllLocks()
if (!lockMethodTable)
return;
xidTable = lockMethodTable->xidHash;
holderTable = lockMethodTable->holderHash;
if (MyProc->waitLock)
LOCK_PRINT("DumpAllLocks: waiting on", MyProc->waitLock, 0);
if (proc->waitLock)
LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
hash_seq(NULL);
while ((xidLook = (XIDLookupEnt *) hash_seq(xidTable)) &&
(xidLook != (XIDLookupEnt *) TRUE))
while ((holder = (HOLDER *) hash_seq(holderTable)) &&
(holder != (HOLDER *) TRUE))
{
XID_PRINT("DumpAllLocks", xidLook);
HOLDER_PRINT("DumpAllLocks", holder);
if (xidLook->tag.lock)
if (holder->tag.lock)
{
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
lock = (LOCK *) MAKE_PTR(holder->tag.lock);
LOCK_PRINT("DumpAllLocks", lock, 0);
}
else
elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL");
if (count++ > 2000)
{
elog(NOTICE, "DumpAllLocks: possible loop, giving up");
break;
}
elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL");
}
}
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.88 2000/12/18 17:33:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.89 2000/12/22 00:51:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -48,7 +48,7 @@
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.88 2000/12/18 17:33:41 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.89 2000/12/22 00:51:54 tgl Exp $
*/
#include "postgres.h"
......@@ -74,13 +74,14 @@
#include <sys/sem.h>
#endif
#include "access/xact.h"
#include "storage/proc.h"
void HandleDeadLock(SIGNAL_ARGS);
static void ProcFreeAllSemaphores(void);
static bool GetOffWaitqueue(PROC *);
static bool GetOffWaitQueue(PROC *);
int DeadlockTimeout = 1000;
......@@ -300,50 +301,76 @@ InitProcess(void)
/* -----------------------
* get process off any wait queue it might be on
*
* NB: this does not remove the process' holder object, nor the lock object,
* even though their holder counts might now have gone to zero. That will
* happen during a subsequent LockReleaseAll call, which we expect will happen
* during transaction cleanup. (Removal of a proc from its wait queue by
* this routine can only happen if we are aborting the transaction.)
* -----------------------
*/
static bool
GetOffWaitqueue(PROC *proc)
GetOffWaitQueue(PROC *proc)
{
bool getoffed = false;
bool gotoff = false;
LockLockTable();
if (proc->links.next != INVALID_OFFSET)
{
int lockmode = proc->token;
LOCK *waitLock = proc->waitLock;
LOCK *waitLock = proc->waitLock;
LOCKMODE lockmode = proc->waitLockMode;
/* Remove proc from lock's wait queue */
Assert(waitLock);
Assert(waitLock->waitProcs.size > 0);
SHMQueueDelete(&(proc->links));
--waitLock->waitProcs.size;
/* Undo increments of holder counts by waiting process */
Assert(waitLock->nHolding > 0);
Assert(waitLock->nHolding > proc->waitLock->nActive);
--waitLock->nHolding;
Assert(waitLock->holders[lockmode] > 0);
--waitLock->holders[lockmode];
/* don't forget to clear waitMask bit if appropriate */
if (waitLock->activeHolders[lockmode] == waitLock->holders[lockmode])
waitLock->waitMask &= ~(1 << lockmode);
ProcLockWakeup(&(waitLock->waitProcs), LOCK_LOCKMETHOD(*waitLock), waitLock);
getoffed = true;
/* Clean up the proc's own state */
SHMQueueElemInit(&(proc->links));
proc->waitLock = NULL;
proc->waitHolder = NULL;
/* See if any other waiters can be woken up now */
ProcLockWakeup(LOCK_LOCKMETHOD(*waitLock), waitLock);
gotoff = true;
}
SHMQueueElemInit(&(proc->links));
UnlockLockTable();
return getoffed;
return gotoff;
}
/*
* ProcReleaseLocks() -- release all locks associated with current transaction
* ProcReleaseLocks() -- release locks associated with current transaction
* at transaction commit or abort
*
* At commit, we release only locks tagged with the current transaction's XID,
* leaving those marked with XID 0 (ie, session locks) undisturbed. At abort,
* we release all locks including XID 0, because we need to clean up after
* a failure. This logic will need extension if we ever support nested
* transactions.
*
* Note that user locks are not released in either case.
*/
void
ProcReleaseLocks()
ProcReleaseLocks(bool isCommit)
{
if (!MyProc)
return;
LockReleaseAll(DEFAULT_LOCKMETHOD, &MyProc->lockQueue);
GetOffWaitqueue(MyProc);
GetOffWaitQueue(MyProc);
LockReleaseAll(DEFAULT_LOCKMETHOD, MyProc,
!isCommit, GetCurrentTransactionId());
}
/*
......@@ -384,47 +411,47 @@ static void
ProcKill(int exitStatus, Datum pid)
{
PROC *proc;
SHMEM_OFFSET location;
/* --------------------
* If this is a FATAL exit the postmaster will have to kill all the
* existing backends and reinitialize shared memory. So all we don't
* existing backends and reinitialize shared memory. So we don't
* need to do anything here.
* --------------------
*/
if (exitStatus != 0)
return;
ShmemPIDLookup(MyProcPid, &location);
if (location == INVALID_OFFSET)
return;
proc = (PROC *) MAKE_PTR(location);
if ((int) pid == MyProcPid)
{
proc = MyProc;
MyProc = NULL;
}
else
{
/* This path is dead code at the moment ... */
SHMEM_OFFSET location = INVALID_OFFSET;
Assert(proc == MyProc || (int)pid != MyProcPid);
ShmemPIDLookup((int) pid, &location);
if (location == INVALID_OFFSET)
return;
proc = (PROC *) MAKE_PTR(location);
}
MyProc = NULL;
Assert(proc);
/* ---------------
* Assume one lock table.
* ---------------
*/
/* Release any spinlocks the proc is holding */
ProcReleaseSpins(proc);
LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue);
#ifdef USER_LOCKS
/* Get the proc off any wait queue it might be on */
GetOffWaitQueue(proc);
/*
* Assume we have a second lock table.
*/
LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
#endif
/* Remove from the standard lock table */
LockReleaseAll(DEFAULT_LOCKMETHOD, proc, true, InvalidTransactionId);
/* ----------------
* get off the wait queue
* ----------------
*/
GetOffWaitqueue(proc);
#ifdef USER_LOCKS
/* Remove from the user lock table */
LockReleaseAll(USER_LOCKMETHOD, proc, true, InvalidTransactionId);
#endif
}
/*
......@@ -488,10 +515,10 @@ SetWaitingForLock(bool waiting)
}
if (QueryCancel) /* cancel request pending */
{
if (GetOffWaitqueue(MyProc))
if (GetOffWaitQueue(MyProc))
{
lockWaiting = false;
elog(ERROR, "Query cancel requested while waiting lock");
elog(ERROR, "Query cancel requested while waiting for lock");
}
}
}
......@@ -519,8 +546,8 @@ LockWaitCancel(void)
set_alarm(B_INFINITE_TIMEOUT, B_PERIODIC_ALARM);
#endif /* __BEOS__ */
if (GetOffWaitqueue(MyProc))
elog(ERROR, "Query cancel requested while waiting lock");
if (GetOffWaitQueue(MyProc))
elog(ERROR, "Query cancel requested while waiting for lock");
}
/*
......@@ -538,18 +565,19 @@ LockWaitCancel(void)
* NOTES: The process queue is now a priority queue for locking.
*/
int
ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
LOCKMETHODCTL *lockctl,
int token, /* lockmode */
LOCK *lock)
ProcSleep(LOCKMETHODCTL *lockctl,
LOCKMODE lockmode,
LOCK *lock,
HOLDER *holder)
{
int i;
PROC_QUEUE *waitQueue = &(lock->waitProcs);
SPINLOCK spinlock = lockctl->masterLock;
PROC *proc;
int myMask = (1 << token);
int myMask = (1 << lockmode);
int waitMask = lock->waitMask;
PROC *proc;
int i;
int aheadHolders[MAX_LOCKMODES];
bool selfConflict = (lockctl->conflictTab[token] & myMask),
bool selfConflict = (lockctl->conflictTab[lockmode] & myMask),
prevSame = false;
#ifndef __BEOS__
struct itimerval timeval,
......@@ -558,26 +586,28 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
bigtime_t time_interval;
#endif
MyProc->token = token;
MyProc->waitLock = lock;
MyProc->waitHolder = holder;
MyProc->waitLockMode = lockmode;
/* We assume the caller set up MyProc->holdLock */
proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
/* if we don't conflict with any waiter - be first in queue */
if (!(lockctl->conflictTab[token] & waitMask))
if (!(lockctl->conflictTab[lockmode] & waitMask))
goto ins;
for (i = 1; i < MAX_LOCKMODES; i++)
aheadHolders[i] = lock->activeHolders[i];
(aheadHolders[token])++;
(aheadHolders[lockmode])++;
for (i = 0; i < waitQueue->size; i++)
{
/* am I waiting for him ? */
if (lockctl->conflictTab[token] & proc->holdLock)
if (lockctl->conflictTab[lockmode] & proc->holdLock)
{
/* is he waiting for me ? */
if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
if (lockctl->conflictTab[proc->waitLockMode] & MyProc->holdLock)
{
/* Yes, report deadlock failure */
MyProc->errType = STATUS_ERROR;
......@@ -586,10 +616,10 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
/* being waiting for him - go past */
}
/* if he waits for me */
else if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
else if (lockctl->conflictTab[proc->waitLockMode] & MyProc->holdLock)
break;
/* if conflicting locks requested */
else if (lockctl->conflictTab[proc->token] & myMask)
else if (lockctl->conflictTab[proc->waitLockMode] & myMask)
{
/*
......@@ -604,13 +634,13 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
* Last attempt to don't move any more: if we don't conflict with
* rest waiters in queue.
*/
else if (!(lockctl->conflictTab[token] & waitMask))
else if (!(lockctl->conflictTab[lockmode] & waitMask))
break;
prevSame = (proc->token == token);
(aheadHolders[proc->token])++;
if (aheadHolders[proc->token] == lock->holders[proc->token])
waitMask &= ~(1 << proc->token);
prevSame = (proc->waitLockMode == lockmode);
(aheadHolders[proc->waitLockMode])++;
if (aheadHolders[proc->waitLockMode] == lock->holders[proc->waitLockMode])
waitMask &= ~(1 << proc->waitLockMode);
proc = (PROC *) MAKE_PTR(proc->links.prev);
}
......@@ -692,10 +722,8 @@ ins:;
rt:;
#ifdef LOCK_DEBUG
/* Just to get meaningful debug messages from DumpLocks() */
MyProc->waitLock = (LOCK *) NULL;
#endif
MyProc->waitLock = NULL;
MyProc->waitHolder = NULL;
return MyProc->errType;
}
......@@ -704,7 +732,7 @@ rt:;
/*
* ProcWakeup -- wake up a process by releasing its private semaphore.
*
* remove the process from the wait queue and set its links invalid.
* Also remove the process from the wait queue and set its links invalid.
* RETURN: the next process in the wait queue.
*/
PROC *
......@@ -720,9 +748,9 @@ ProcWakeup(PROC *proc, int errType)
retProc = (PROC *) MAKE_PTR(proc->links.prev);
/* you have to update waitLock->waitProcs.size yourself */
SHMQueueDelete(&(proc->links));
SHMQueueElemInit(&(proc->links));
(proc->waitLock->waitProcs.size)--;
proc->errType = errType;
......@@ -736,65 +764,70 @@ ProcWakeup(PROC *proc, int errType)
* released.
*/
int
ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
{
PROC_QUEUE *queue = &(lock->waitProcs);
PROC *proc;
int count = 0;
int last_locktype = 0;
int awoken = 0;
LOCKMODE last_lockmode = 0;
int queue_size = queue->size;
Assert(queue->size >= 0);
Assert(queue_size >= 0);
if (!queue->size)
if (!queue_size)
return STATUS_NOT_FOUND;
proc = (PROC *) MAKE_PTR(queue->links.prev);
while ((queue_size--) && (proc))
{
/*
* This proc will conflict as the previous one did, don't even
* try.
*/
if (proc->token == last_locktype)
continue;
while (queue_size-- > 0)
{
if (proc->waitLockMode == last_lockmode)
{
/*
* This proc will conflict as the previous one did, don't even
* try.
*/
goto nextProc;
}
/*
* Does this proc conflict with locks held by others ?
*/
if (LockResolveConflicts(lockmethod,
proc->waitLockMode,
lock,
proc->token,
proc->xid,
(XIDLookupEnt *) NULL) != STATUS_OK)
proc->waitHolder,
proc,
NULL) != STATUS_OK)
{
if (count != 0)
/* Yes. Quit if we already awoke at least one process. */
if (awoken != 0)
break;
last_locktype = proc->token;
continue;
/* Otherwise, see if any later waiters can be awoken. */
last_lockmode = proc->waitLockMode;
goto nextProc;
}
/*
* there was a waiting process, grant it the lock before waking it
* up. This will prevent another process from seizing the lock
* between the time we release the lock master (spinlock) and the
* time that the awoken process begins executing again.
* OK to wake up this sleeping process.
*/
GrantLock(lock, proc->token);
GrantLock(lock, proc->waitHolder, proc->waitLockMode);
proc = ProcWakeup(proc, NO_ERROR);
awoken++;
/*
* ProcWakeup removes proc from the lock waiting process queue and
* returns the next proc in chain.
* ProcWakeup removes proc from the lock's waiting process queue
* and returns the next proc in chain; don't use prev link.
*/
continue;
count++;
queue->size--;
proc = ProcWakeup(proc, NO_ERROR);
nextProc:
proc = (PROC *) MAKE_PTR(proc->links.prev);
}
Assert(queue->size >= 0);
if (count)
if (awoken)
return STATUS_OK;
else
{
......@@ -802,9 +835,10 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
#ifdef LOCK_DEBUG
if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
{
elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process", MAKE_OFFSET(lock));
elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process",
MAKE_OFFSET(lock));
if (Debug_deadlocks)
DumpAllLocks();
DumpAllLocks();
}
#endif
return STATUS_NOT_FOUND;
......@@ -872,10 +906,12 @@ HandleDeadLock(SIGNAL_ARGS)
*/
mywaitlock = MyProc->waitLock;
Assert(mywaitlock->waitProcs.size > 0);
lockWaiting = false;
--mywaitlock->waitProcs.size;
SHMQueueDelete(&(MyProc->links));
SHMQueueElemInit(&(MyProc->links));
MyProc->waitLock = NULL;
MyProc->waitHolder = NULL;
lockWaiting = false;
/* ------------------
* Unlock my semaphore so that the interrupted ProcSleep() call can finish.
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: lmgr.h,v 1.26 2000/11/28 23:27:57 tgl Exp $
* $Id: lmgr.h,v 1.27 2000/12/22 00:51:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -33,17 +33,21 @@
extern LOCKMETHOD LockTableId;
extern LOCKMETHOD InitLockTable(void);
extern LOCKMETHOD InitLockTable(int maxBackends);
extern void RelationInitLockInfo(Relation relation);
/* Lock a relation */
extern void LockRelation(Relation relation, LOCKMODE lockmode);
extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
/* this is for indices */
extern void LockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
extern void UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
/* Lock a page (mainly used for indices) */
extern void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
extern void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode);
/* and this is for transactions */
/* Lock an XID (used to wait for a transaction to finish) */
extern void XactLockTableInsert(TransactionId xid);
extern void XactLockTableWait(TransactionId xid);
......
/*-------------------------------------------------------------------------
*
* lock.h
*
* POSTGRES low-level lock mechanism
*
*
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: lock.h,v 1.39 2000/07/17 03:05:30 tgl Exp $
* $Id: lock.h,v 1.40 2000/12/22 00:51:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -18,13 +18,20 @@
#include "storage/itemptr.h"
#include "storage/shmem.h"
extern SPINLOCK LockMgrLock;
typedef int LOCKMASK;
#define INIT_TABLE_SIZE 100
#define MAX_TABLE_SIZE 1000
/* originally in procq.h */
typedef struct PROC_QUEUE
{
SHM_QUEUE links;
int size;
} PROC_QUEUE;
/* struct proc is declared in storage/proc.h, but must forward-reference it */
typedef struct proc PROC;
extern SPINLOCK LockMgrLock;
#ifdef LOCK_DEBUG
extern int Trace_lock_oidmin;
extern bool Trace_locks;
......@@ -39,14 +46,16 @@ extern bool Debug_deadlocks;
* memory the lock manager is going to require.
* See LockShmemSize() in lock.c.
*
* NLOCKS_PER_XACT - The number of unique locks acquired in a transaction
* (should be configurable!)
* NLOCKS_PER_XACT - The number of unique objects locked in a transaction
* (this should be configurable!)
* NLOCKENTS - The maximum number of lock entries in the lock table.
* ----------------------
*/
#define NLOCKS_PER_XACT 64
#define NLOCKENTS(maxBackends) (NLOCKS_PER_XACT*(maxBackends))
typedef int LOCKMASK;
typedef int LOCKMODE;
typedef int LOCKMETHOD;
......@@ -68,29 +77,8 @@ typedef int LOCKMETHOD;
#define MIN_LOCKMETHOD DEFAULT_LOCKMETHOD
typedef struct LTAG
{
Oid relId;
Oid dbId;
union
{
BlockNumber blkno;
TransactionId xid;
} objId;
/*
* offnum should be part of objId.tupleId above, but would increase
* sizeof(LOCKTAG) and so moved here; currently used by userlocks
* only.
*/
OffsetNumber offnum;
uint16 lockmethod; /* needed by userlocks */
} LOCKTAG;
#define TAGSIZE (sizeof(LOCKTAG))
#define LOCKTAG_LOCKMETHOD(locktag) ((locktag).lockmethod)
/* This is the control structure for a lock table. It
/*
* This is the control structure for a lock table. It
* lives in shared memory:
*
* lockmethod -- the handle used by the lock table's clients to
......@@ -108,7 +96,6 @@ typedef struct LTAG
* starvation).
*
* masterlock -- synchronizes access to the table
*
*/
typedef struct LOCKMETHODCTL
{
......@@ -120,91 +107,47 @@ typedef struct LOCKMETHODCTL
} LOCKMETHODCTL;
/*
* lockHash -- hash table on lock Ids,
* xidHash -- hash on xid and lockId in case
* multiple processes are holding the lock
* ctl - control structure described above.
* Non-shared header for a lock table.
*
* lockHash -- hash table holding per-locked-object lock information
* holderHash -- hash table holding per-lock-holder lock information
* ctl - shared control structure described above.
*/
typedef struct LOCKMETHODTABLE
{
HTAB *lockHash;
HTAB *xidHash;
HTAB *holderHash;
LOCKMETHODCTL *ctl;
} LOCKMETHODTABLE;
/* -----------------------
* A transaction never conflicts with its own locks. Hence, if
* multiple transactions hold non-conflicting locks on the same
* data, private per-transaction information must be stored in the
* XID table. The tag is XID + shared memory lock address so that
* all locks can use the same XID table. The private information
* we store is the number of locks of each type (holders) and the
* total number of locks (nHolding) held by the transaction.
*
* NOTE:
* There were some problems with the fact that currently TransactionIdData
* is a 5 byte entity and compilers long word aligning of structure fields.
* If the 3 byte padding is put in front of the actual xid data then the
* hash function (which uses XID_TAGSIZE when deciding how many bytes of a
* struct to look at for the key) might only see the last two bytes of the xid.
*
* Clearly this is not good since its likely that these bytes will be the
* same for many transactions and hence they will share the same entry in
* hash table causing the entry to be corrupted. For this long-winded
* reason I have put the tag in a struct of its own to ensure that the
* XID_TAGSIZE is computed correctly. It used to be sizeof (SHMEM_OFFSET) +
* sizeof(TransactionIdData) which != sizeof(XIDTAG).
*
* Finally since the hash function will now look at all 12 bytes of the tag
* the padding bytes MUST be zero'd before use in hash_search() as they
* will have random values otherwise. Jeff 22 July 1991.
* -----------------------
*/
typedef struct XIDTAG
{
SHMEM_OFFSET lock;
int pid;
TransactionId xid;
#ifdef USE_XIDTAG_LOCKMETHOD
uint16 lockmethod; /* for debug or consistency checking */
#endif
} XIDTAG;
#ifdef USE_XIDTAG_LOCKMETHOD
#define XIDTAG_LOCKMETHOD(xidtag) ((xidtag).lockmethod)
#else
#define XIDTAG_LOCKMETHOD(xidtag) \
(((LOCK*) MAKE_PTR((xidtag).lock))->tag.lockmethod)
#endif
typedef struct XIDLookupEnt
/*
* LOCKTAG is the key information needed to look up a LOCK item in the
* lock hashtable. A LOCKTAG value uniquely identifies a lockable object.
*/
typedef struct LOCKTAG
{
/* tag */
XIDTAG tag;
/* data */
int holders[MAX_LOCKMODES];
int nHolding;
SHM_QUEUE queue;
} XIDLookupEnt;
#define SHMEM_XIDTAB_KEYSIZE sizeof(XIDTAG)
#define SHMEM_XIDTAB_DATASIZE (sizeof(XIDLookupEnt) - SHMEM_XIDTAB_KEYSIZE)
Oid relId;
Oid dbId;
union
{
BlockNumber blkno;
TransactionId xid;
} objId;
#define XID_TAGSIZE (sizeof(XIDTAG))
#define XIDENT_LOCKMETHOD(xident) (XIDTAG_LOCKMETHOD((xident).tag))
/*
* offnum should be part of objId.tupleId above, but would increase
* sizeof(LOCKTAG) and so moved here; currently used by userlocks
* only.
*/
OffsetNumber offnum;
/* originally in procq.h */
typedef struct PROC_QUEUE
{
SHM_QUEUE links;
int size;
} PROC_QUEUE;
uint16 lockmethod; /* needed by userlocks */
} LOCKTAG;
/*
* lock information:
* Per-locked-object lock information:
*
* tag -- uniquely identifies the object being locked
* mask -- union of the conflict masks of all lock types
......@@ -232,40 +175,76 @@ typedef struct LOCK
#define SHMEM_LOCKTAB_KEYSIZE sizeof(LOCKTAG)
#define SHMEM_LOCKTAB_DATASIZE (sizeof(LOCK) - SHMEM_LOCKTAB_KEYSIZE)
#define LOCK_LOCKMETHOD(lock) (LOCKTAG_LOCKMETHOD((lock).tag))
#define LOCK_LOCKMETHOD(lock) ((lock).tag.lockmethod)
#define LockGetLock_nHolders(l) l->nHolders
#ifdef NOT_USED
#define LockDecrWaitHolders(lock, lockmode) \
( \
lock->nHolding--, \
lock->holders[lockmode]-- \
)
#endif
#define LockLockTable() SpinAcquire(LockMgrLock);
#define UnlockLockTable() SpinRelease(LockMgrLock);
/*
* We may have several different transactions holding or awaiting locks
* on the same lockable object. We need to store some per-holder information
* for each such holder (or would-be holder).
*
* HOLDERTAG is the key information needed to look up a HOLDER item in the
* holder hashtable. A HOLDERTAG value uniquely identifies a lock holder.
*
* There are two possible kinds of holder tags: a transaction (identified
* both by the PID of the backend running it, and the xact's own ID) and
* a session (identified by backend PID, with xid = InvalidTransactionId).
*
* Currently, session holders are used for user locks and for cross-xact
* locks obtained for VACUUM. We assume that a session lock never conflicts
* with per-transaction locks obtained by the same backend.
*/
typedef struct HOLDERTAG
{
SHMEM_OFFSET lock; /* link to per-lockable-object information */
int pid; /* PID of backend */
TransactionId xid; /* xact ID, or InvalidTransactionId */
} HOLDERTAG;
typedef struct HOLDER
{
/* tag */
HOLDERTAG tag;
/* data */
int holders[MAX_LOCKMODES];
int nHolding;
SHM_QUEUE queue;
} HOLDER;
#define SHMEM_HOLDERTAB_KEYSIZE sizeof(HOLDERTAG)
#define SHMEM_HOLDERTAB_DATASIZE (sizeof(HOLDER) - SHMEM_HOLDERTAB_KEYSIZE)
#define HOLDER_LOCKMETHOD(holder) \
(((LOCK *) MAKE_PTR((holder).tag.lock))->tag.lockmethod)
#define LockLockTable() SpinAcquire(LockMgrLock)
#define UnlockLockTable() SpinRelease(LockMgrLock)
/*
* function prototypes
*/
extern void InitLocks(void);
extern void LockDisable(int status);
extern void LockDisable(bool status);
extern bool LockingDisabled(void);
extern LOCKMETHOD LockMethodTableInit(char *tabName, LOCKMASK *conflictsP,
int *prioP, int numModes);
int *prioP, int numModes, int maxBackends);
extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod);
extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
LOCKMODE lockmode);
extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock,
LOCKMODE lockmode, TransactionId xid,
XIDLookupEnt *xidentP);
TransactionId xid, LOCKMODE lockmode);
extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
LOCKMODE lockmode);
extern void GrantLock(LOCK *lock, LOCKMODE lockmode);
extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue);
TransactionId xid, LOCKMODE lockmode);
extern bool LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
bool allxids, TransactionId xid);
extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCKMODE lockmode,
LOCK *lock, HOLDER *holder, PROC *proc,
int *myHolders);
extern void GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode);
extern int LockShmemSize(int maxBackends);
extern bool LockingDisabled(void);
extern bool DeadLockCheck(void *proc, LOCK *findlock);
extern bool DeadLockCheck(PROC *thisProc, LOCK *findlock);
#ifdef LOCK_DEBUG
extern void DumpLocks(void);
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: proc.h,v 1.32 2000/11/28 23:27:57 tgl Exp $
* $Id: proc.h,v 1.33 2000/12/22 00:51:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -30,7 +30,7 @@ typedef struct
/*
* Each backend has:
*/
typedef struct proc
struct proc
{
/* proc->links MUST BE THE FIRST ELEMENT OF STRUCT (see ProcWakeup()) */
......@@ -50,16 +50,25 @@ typedef struct proc
TransactionId xmin; /* minimal running XID as it was when we
* were starting our xact: vacuum must not
* remove tuples deleted by xid >= xmin ! */
XLogRecPtr logRec;
LOCK *waitLock; /* Lock we're sleeping on ... */
int token; /* type of lock we sleeping for */
int holdLock; /* while holding these locks */
/* Info about lock the process is currently waiting for, if any */
LOCK *waitLock; /* Lock object we're sleeping on ... */
HOLDER *waitHolder; /* Per-holder info for our lock */
LOCKMODE waitLockMode; /* type of lock we're waiting for */
LOCKMASK holdLock; /* bitmask for lock types already held */
int pid; /* This backend's process id */
Oid databaseId; /* OID of database this backend is using */
short sLocks[MAX_SPINS]; /* Spin lock stats */
SHM_QUEUE lockQueue; /* locks associated with current
* transaction */
} PROC;
};
/* NOTE: "typedef struct proc PROC" appears in storage/lock.h. */
extern PROC *MyProc;
......@@ -122,15 +131,14 @@ typedef struct procglobal
*/
extern void InitProcGlobal(int maxBackends);
extern void InitProcess(void);
extern void ProcReleaseLocks(void);
extern void ProcReleaseLocks(bool isCommit);
extern bool ProcRemove(int pid);
extern void ProcQueueInit(PROC_QUEUE *queue);
extern int ProcSleep(PROC_QUEUE *queue, LOCKMETHODCTL *lockctl, int token,
LOCK *lock);
extern int ProcSleep(LOCKMETHODCTL *lockctl, LOCKMODE lockmode,
LOCK *lock, HOLDER *holder);
extern PROC *ProcWakeup(PROC *proc, int errType);
extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
LOCK *lock);
extern int ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock);
extern void ProcAddLock(SHM_QUEUE *elem);
extern void ProcReleaseSpins(PROC *proc);
extern void LockWaitCancel(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment