Commit 7dbcf31b authored by Marc G. Fournier's avatar Marc G. Fournier

From: Massimo Dal Zotto <dz@cs.unitn.it>

lock.patch

        I have rewritten lock.c cleaning up the code and adding better
        assert checking I have also added some fields to the lock and
        xid tags for better support of user locks. There is also a new
        function which returns an array of pids owning a lock.
        I'm using this code from over six months and it works fine.
parent 1acf0d85
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.32 1998/06/30 02:33:31 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.33 1998/08/25 21:20:26 scrappy Exp $
* *
* NOTES * NOTES
* Outside modules can create a lock table and acquire/release * Outside modules can create a lock table and acquire/release
...@@ -18,11 +18,9 @@ ...@@ -18,11 +18,9 @@
* *
* Interface: * Interface:
* *
* LockAcquire(), LockRelease(), LockMethodTableInit(). * LockAcquire(), LockRelease(), LockMethodTableInit(),
* * LockMethodTableRename(), LockReleaseAll, LockOwners()
* LockReplace() is called only within this module and by the * LockResolveConflicts(), GrantLock()
* lkchain module. It releases a lock without looking
* the lock up in the lock table.
* *
* NOTE: This module is used to define new lock tables. The * NOTE: This module is used to define new lock tables. The
* multi-level lock table (multi.c) used by the heap * multi-level lock table (multi.c) used by the heap
...@@ -35,6 +33,7 @@ ...@@ -35,6 +33,7 @@
#include <string.h> #include <string.h>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
#include <signal.h>
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h" #include "miscadmin.h"
...@@ -49,25 +48,110 @@ ...@@ -49,25 +48,110 @@
#include "utils/palloc.h" #include "utils/palloc.h"
#include "access/xact.h" #include "access/xact.h"
#include "access/transam.h" #include "access/transam.h"
#include "utils/trace.h"
#include "utils/ps_status.h"
static int static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode); TransactionId xid);
/*#define LOCK_MGR_DEBUG*/
#ifndef LOCK_MGR_DEBUG
#define LOCK_PRINT(where,tag,type) /*
#define LOCK_DUMP(where,lock,type) * lockDebugRelation can be used to trace unconditionally a single relation,
#define LOCK_DUMP_AUX(where,lock,type) * for example pg_listener, if you suspect there are locking problems.
*
* lockDebugOidMin is is used to avoid tracing postgres relations, which
* would produce a lot of output. Unfortunately most system relations are
* created after bootstrap and have oid greater than BootstrapObjectIdData.
* If you are using tprintf you should specify a value greater than the max
* oid of system relations, which can be found with the following query:
*
* select max(int4in(int4out(oid))) from pg_class where relname ~ '^pg_';
*
* To get a useful lock trace you can use the following pg_options:
*
* -T "verbose,query,locks,userlocks,lock_debug_oidmin=17500"
*/
#define LOCKDEBUG(lockmethod) (pg_options[TRACE_SHORTLOCKS+lockmethod])
#define lockDebugRelation (pg_options[TRACE_LOCKRELATION])
#define lockDebugOidMin (pg_options[TRACE_LOCKOIDMIN])
#define lockReadPriority (pg_options[OPT_LOCKREADPRIORITY])
#ifdef LOCK_MGR_DEBUG
#define LOCK_PRINT(where,lock,type) \
if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
&& (lock->tag.relId >= lockDebugOidMin)) \
|| (lock->tag.relId == lockDebugRelation)) \
LOCK_PRINT_AUX(where,lock,type)
#define LOCK_PRINT_AUX(where,lock,type) \
TPRINTF(TRACE_ALL, \
"%s: lock(%x) tbl(%d) rel(%d) db(%d) tid(%d,%d) mask(%x) " \
"hold(%d,%d,%d,%d,%d)=%d " \
"act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
where, \
MAKE_OFFSET(lock), \
lock->tag.lockmethod, \
lock->tag.relId, \
lock->tag.dbId, \
((lock->tag.tupleId.ip_blkid.bi_hi<<16)+ \
lock->tag.tupleId.ip_blkid.bi_lo), \
lock->tag.tupleId.ip_posid, \
lock->mask, \
lock->holders[1], \
lock->holders[2], \
lock->holders[3], \
lock->holders[4], \
lock->holders[5], \
lock->nHolding, \
lock->activeHolders[1], \
lock->activeHolders[2], \
lock->activeHolders[3], \
lock->activeHolders[4], \
lock->activeHolders[5], \
lock->nActive, \
lock->waitProcs.size, \
lock_types[type])
#define XID_PRINT(where,xidentP) \
if (((LOCKDEBUG(XIDENT_LOCKMETHOD(*(xidentP))) >= 1) \
&& (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
>= lockDebugOidMin)) \
|| (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
== lockDebugRelation)) \
XID_PRINT_AUX(where,xidentP)
#define XID_PRINT_AUX(where,xidentP) \
TPRINTF(TRACE_ALL, \
"%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \
"hold(%d,%d,%d,%d,%d)=%d", \
where, \
MAKE_OFFSET(xidentP), \
xidentP->tag.lock, \
XIDENT_LOCKMETHOD(*(xidentP)), \
xidentP->tag.pid, \
xidentP->tag.xid, \
xidentP->holders[1], \
xidentP->holders[2], \
xidentP->holders[3], \
xidentP->holders[4], \
xidentP->holders[5], \
xidentP->nHolding)
#define LOCK_TPRINTF(lock, args...) \
if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
&& (lock->tag.relId >= lockDebugOidMin)) \
|| (lock->tag.relId == lockDebugRelation)) \
TPRINTF(TRACE_ALL, args)
#else /* !LOCK_MGR_DEBUG */
#define LOCK_PRINT(where,lock,type)
#define LOCK_PRINT_AUX(where,lock,type)
#define XID_PRINT(where,xidentP) #define XID_PRINT(where,xidentP)
#define XID_PRINT_AUX(where,xidentP)
#define LOCK_TPRINTF(lock, args...)
#endif /* !LOCK_MGR_DEBUG */
#else /* LOCK_MGR_DEBUG */
int lockDebug = 0;
unsigned int lock_debug_oid_min = BootstrapObjectIdData;
static char *lock_types[] = { static char *lock_types[] = {
"NONE", "",
"WRITE", "WRITE",
"READ", "READ",
"WRITE INTENT", "WRITE INTENT",
...@@ -75,59 +159,6 @@ static char *lock_types[] = { ...@@ -75,59 +159,6 @@ static char *lock_types[] = {
"EXTEND" "EXTEND"
}; };
#define LOCK_PRINT(where,tag,type)\
if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \
elog(DEBUG, \
"%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \
MyProcPid,\
tag->relId, tag->dbId, \
((tag->tupleId.ip_blkid.bi_hi<<16)+\
tag->tupleId.ip_blkid.bi_lo),\
tag->tupleId.ip_posid, \
lock_types[type])
#define LOCK_DUMP(where,lock,type)\
if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \
LOCK_DUMP_AUX(where,lock,type)
#define LOCK_DUMP_AUX(where,lock,type)\
elog(DEBUG, \
"%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\
"holders (%d,%d,%d,%d,%d) type (%s)",where, \
MyProcPid,\
lock->tag.relId, lock->tag.dbId, \
((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\
lock->tag.tupleId.ip_blkid.bi_lo),\
lock->tag.tupleId.ip_posid, \
lock->nHolding,\
lock->holders[1],\
lock->holders[2],\
lock->holders[3],\
lock->holders[4],\
lock->holders[5],\
lock_types[type])
#define XID_PRINT(where,xidentP)\
if ((lockDebug >= 2) && \
(((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
>= lock_debug_oid_min)) \
elog(DEBUG,\
"%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\
"holders (%d,%d,%d,%d,%d)",\
where,\
MyProcPid,\
xidentP->tag.xid,\
xidentP->tag.pid,\
xidentP->tag.lock,\
xidentP->nHolding,\
xidentP->holders[1],\
xidentP->holders[2],\
xidentP->holders[3],\
xidentP->holders[4],\
xidentP->holders[5])
#endif /* LOCK_MGR_DEBUG */
SPINLOCK LockMgrLock; /* in Shmem or created in SPINLOCK LockMgrLock; /* in Shmem or created in
* CreateSpinlocks() */ * CreateSpinlocks() */
...@@ -171,6 +202,16 @@ InitLocks() ...@@ -171,6 +202,16 @@ InitLocks()
BITS_ON[i] = bit; BITS_ON[i] = bit;
BITS_OFF[i] = ~bit; BITS_OFF[i] = ~bit;
} }
#ifdef LOCK_MGR_DEBUG
/*
* If lockDebugOidMin value has not been specified
* in pg_options set a default value.
*/
if (!lockDebugOidMin) {
lockDebugOidMin = BootstrapObjectIdData;
}
#endif
} }
/* ------------------- /* -------------------
...@@ -234,7 +275,7 @@ LockMethodTableInit(char *tabName, ...@@ -234,7 +275,7 @@ LockMethodTableInit(char *tabName,
{ {
elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d", elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
numModes, MAX_LOCKMODES); numModes, MAX_LOCKMODES);
return (INVALID_TABLEID); return (INVALID_LOCKMETHOD);
} }
/* allocate a string for the shmem index table lookup */ /* allocate a string for the shmem index table lookup */
...@@ -242,7 +283,7 @@ LockMethodTableInit(char *tabName, ...@@ -242,7 +283,7 @@ LockMethodTableInit(char *tabName,
if (!shmemName) if (!shmemName)
{ {
elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName); elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName);
return (INVALID_TABLEID); return (INVALID_LOCKMETHOD);
} }
/* each lock table has a non-shared header */ /* each lock table has a non-shared header */
...@@ -251,7 +292,7 @@ LockMethodTableInit(char *tabName, ...@@ -251,7 +292,7 @@ LockMethodTableInit(char *tabName,
{ {
elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName); elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName);
pfree(shmemName); pfree(shmemName);
return (INVALID_TABLEID); return (INVALID_LOCKMETHOD);
} }
/* ------------------------ /* ------------------------
...@@ -354,7 +395,7 @@ LockMethodTableInit(char *tabName, ...@@ -354,7 +395,7 @@ LockMethodTableInit(char *tabName,
if (status) if (status)
return (lockMethodTable->ctl->lockmethod); return (lockMethodTable->ctl->lockmethod);
else else
return (INVALID_TABLEID); return (INVALID_LOCKMETHOD);
} }
/* /*
...@@ -369,16 +410,16 @@ LockMethodTableInit(char *tabName, ...@@ -369,16 +410,16 @@ LockMethodTableInit(char *tabName,
* client to use different lockmethods when acquiring/releasing * client to use different lockmethods when acquiring/releasing
* short term and long term locks. * short term and long term locks.
*/ */
#ifdef NOT_USED
LOCKMETHOD LOCKMETHOD
LockMethodTableRename(LOCKMETHOD lockmethod) LockMethodTableRename(LOCKMETHOD lockmethod)
{ {
LOCKMETHOD newLockMethod; LOCKMETHOD newLockMethod;
if (NumLockMethods >= MAX_LOCK_METHODS) if (NumLockMethods >= MAX_LOCK_METHODS)
return (INVALID_TABLEID); return (INVALID_LOCKMETHOD);
if (LockMethodTable[lockmethod] == INVALID_TABLEID) if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
return (INVALID_TABLEID); return (INVALID_LOCKMETHOD);
/* other modules refer to the lock table by a lockmethod */ /* other modules refer to the lock table by a lockmethod */
newLockMethod = NumLockMethods; newLockMethod = NumLockMethods;
...@@ -387,7 +428,6 @@ LockMethodTableRename(LOCKMETHOD lockmethod) ...@@ -387,7 +428,6 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
LockMethodTable[newLockMethod] = LockMethodTable[lockmethod]; LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
return (newLockMethod); return (newLockMethod);
} }
#endif
/* /*
* LockAcquire -- Check for lock conflicts, sleep if conflict found, * LockAcquire -- Check for lock conflicts, sleep if conflict found,
...@@ -398,8 +438,11 @@ LockMethodTableRename(LOCKMETHOD lockmethod) ...@@ -398,8 +438,11 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
* Side Effects: The lock is always acquired. No way to abort * Side Effects: The lock is always acquired. No way to abort
* a lock acquisition other than aborting the transaction. * a lock acquisition other than aborting the transaction.
* Lock is recorded in the lkchain. * Lock is recorded in the lkchain.
*
#ifdef USER_LOCKS #ifdef USER_LOCKS
*
* Note on User Locks: * Note on User Locks:
*
* User locks are handled totally on the application side as * User locks are handled totally on the application side as
* long term cooperative locks which extend beyond the normal * long term cooperative locks which extend beyond the normal
* transaction boundaries. Their purpose is to indicate to an * transaction boundaries. Their purpose is to indicate to an
...@@ -421,24 +464,24 @@ LockMethodTableRename(LOCKMETHOD lockmethod) ...@@ -421,24 +464,24 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
* acquired if already held by another process. They must be * acquired if already held by another process. They must be
* released explicitly by the application but they are released * released explicitly by the application but they are released
* automatically when a backend terminates. * automatically when a backend terminates.
* They are indicated by a dummy lockmethod 0 which doesn't have * They are indicated by a lockmethod 2 which is an alias for the
* any table allocated but uses the normal lock table, and are * normal lock table, and are distinguished from normal locks
* distinguished from normal locks for the following differences: * for the following differences:
* *
* normal lock user lock * normal lock user lock
* *
* lockmethod 1 0 * lockmethod 1 2
* tag.relId rel oid 0 * tag.relId rel oid 0
* tag.ItemPointerData.ip_blkid block id lock id2 * tag.ItemPointerData.ip_blkid block id lock id2
* tag.ItemPointerData.ip_posid tuple offset lock id1 * tag.ItemPointerData.ip_posid tuple offset lock id1
* xid.pid 0 backend pid * xid.pid 0 backend pid
* xid.xid current xid 0 * xid.xid xid or 0 0
* persistence transaction user or backend * persistence transaction user or backend
* *
* The lockmode parameter can have the same values for normal locks * The lockmode parameter can have the same values for normal locks
* although probably only WRITE_LOCK can have some practical use. * although probably only WRITE_LOCK can have some practical use.
* *
* DZ - 4 Oct 1996 * DZ - 22 Nov 1997
#endif #endif
*/ */
...@@ -453,25 +496,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -453,25 +496,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
SPINLOCK masterLock; SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable; LOCKMETHODTABLE *lockMethodTable;
int status; int status;
TransactionId myXid; TransactionId xid;
#ifdef USER_LOCKS #ifdef USER_LOCKS
int is_user_lock; int is_user_lock;
is_user_lock = (lockmethod == 0); is_user_lock = (lockmethod == USER_LOCKMETHOD);
if (is_user_lock) if (is_user_lock)
{ {
lockmethod = 1;
#ifdef USER_LOCKS_DEBUG #ifdef USER_LOCKS_DEBUG
elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d", TPRINTF(TRACE_USERLOCKS, "LockAcquire: user lock [%u,%u] %s",
locktag->tupleId.ip_posid, locktag->tupleId.ip_posid,
((locktag->tupleId.ip_blkid.bi_hi << 16) + ((locktag->tupleId.ip_blkid.bi_hi << 16) +
locktag->tupleId.ip_blkid.bi_lo), locktag->tupleId.ip_blkid.bi_lo),
lockmode); lock_types[lockmode]);
#endif #endif
} }
#endif #endif
/* ???????? This must be changed when short term locks will be used */
locktag->lockmethod = lockmethod;
Assert(lockmethod < NumLockMethods); Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod]; lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable) if (!lockMethodTable)
...@@ -483,14 +528,16 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -483,14 +528,16 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (LockingIsDisabled) if (LockingIsDisabled)
return (TRUE); return (TRUE);
LOCK_PRINT("Acquire", locktag, lockmode);
masterLock = lockMethodTable->ctl->masterLock; masterLock = lockMethodTable->ctl->masterLock;
SpinAcquire(masterLock); SpinAcquire(masterLock);
/*
* Find or create a lock with this tag
*/
Assert(lockMethodTable->lockHash->hash == tag_hash); Assert(lockMethodTable->lockHash->hash == tag_hash);
lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_ENTER, &found); lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
HASH_ENTER, &found);
if (!lock) if (!lock)
{ {
SpinRelease(masterLock); SpinRelease(masterLock);
...@@ -505,15 +552,19 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -505,15 +552,19 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (!found) if (!found)
{ {
lock->mask = 0; lock->mask = 0;
ProcQueueInit(&(lock->waitProcs));
MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
lock->nHolding = 0; lock->nHolding = 0;
lock->nActive = 0; lock->nActive = 0;
MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
ProcQueueInit(&(lock->waitProcs));
Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid), Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
&(locktag->tupleId.ip_blkid))); &(locktag->tupleId.ip_blkid)));
LOCK_PRINT("LockAcquire: new", lock, lockmode);
} else {
LOCK_PRINT("LockAcquire: found", lock, lockmode);
Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding);
} }
/* ------------------ /* ------------------
...@@ -522,7 +573,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -522,7 +573,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* ------------------ * ------------------
*/ */
xidTable = lockMethodTable->xidHash; xidTable = lockMethodTable->xidHash;
myXid = GetCurrentTransactionId();
/* ------------------ /* ------------------
* Zero out all of the tag bytes (this clears the padding bytes for long * Zero out all of the tag bytes (this clears the padding bytes for long
...@@ -530,36 +580,48 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -530,36 +580,48 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* ------------------ * ------------------
*/ */
MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */ MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */
TransactionIdStore(myXid, &item.tag.xid);
item.tag.lock = MAKE_OFFSET(lock); item.tag.lock = MAKE_OFFSET(lock);
#if 0 #ifdef USE_XIDTAG_LOCKMETHOD
item.tag.pid = MyPid; item.tag.lockmethod = lockmethod;
#endif #endif
#ifdef USER_LOCKS #ifdef USER_LOCKS
if (is_user_lock) if (is_user_lock)
{ {
item.tag.pid = MyProcPid; item.tag.pid = MyProcPid;
item.tag.xid = myXid = 0; item.tag.xid = xid = 0;
#ifdef USER_LOCKS_DEBUG } else {
elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]", xid = GetCurrentTransactionId();
item.tag.lock, item.tag.pid, item.tag.xid); TransactionIdStore(xid, &item.tag.xid);
#endif
} }
#else
xid = GetCurrentTransactionId();
TransactionIdStore(xid, &item.tag.xid);
#endif #endif
result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found); /*
* Find or create an xid entry with this tag
*/
result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_ENTER, &found);
if (!result) if (!result)
{ {
elog(NOTICE, "LockAcquire: xid table corrupted"); elog(NOTICE, "LockAcquire: xid table corrupted");
return (STATUS_ERROR); return (STATUS_ERROR);
} }
/*
* If not found initialize the new entry
*/
if (!found) if (!found)
{ {
XID_PRINT("LockAcquire: queueing XidEnt", result);
ProcAddLock(&result->queue);
result->nHolding = 0; result->nHolding = 0;
MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES); MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
ProcAddLock(&result->queue);
XID_PRINT("LockAcquire: new", result);
} else {
XID_PRINT("LockAcquire: found", result);
Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0));
Assert(result->nHolding <= lock->nActive);
} }
/* ---------------- /* ----------------
...@@ -570,6 +632,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -570,6 +632,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/ */
lock->nHolding++; lock->nHolding++;
lock->holders[lockmode]++; lock->holders[lockmode]++;
Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
/* -------------------- /* --------------------
* If I'm the only one holding a lock, then there * If I'm the only one holding a lock, then there
...@@ -582,43 +645,59 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -582,43 +645,59 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
{ {
result->holders[lockmode]++; result->holders[lockmode]++;
result->nHolding++; result->nHolding++;
XID_PRINT("LockAcquire: owning", result);
Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
GrantLock(lock, lockmode); GrantLock(lock, lockmode);
SpinRelease(masterLock); SpinRelease(masterLock);
return (TRUE); return (TRUE);
} }
Assert(result->nHolding <= lock->nActive); status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
status = LockResolveConflicts(lockmethod, lock, lockmode, myXid);
if (status == STATUS_OK) if (status == STATUS_OK)
GrantLock(lock, lockmode); GrantLock(lock, lockmode);
else if (status == STATUS_FOUND) else if (status == STATUS_FOUND)
{ {
#ifdef USER_LOCKS #ifdef USER_LOCKS
/* /*
* User locks are non blocking. If we can't acquire a lock remove * User locks are non blocking. If we can't acquire a lock we must
* the xid entry and return FALSE without waiting. * remove the xid entry and return FALSE without waiting.
*/ */
if (is_user_lock) if (is_user_lock)
{ {
if (!result->nHolding) if (!result->nHolding)
{ {
SHMQueueDelete(&result->queue); SHMQueueDelete(&result->queue);
hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found); result = (XIDLookupEnt *) hash_search(xidTable,
(Pointer) &result,
HASH_REMOVE, &found);
if (!result || !found) {
elog(NOTICE, "LockAcquire: remove xid, table corrupted");
}
} else {
XID_PRINT_AUX("LockAcquire: NHOLDING", result);
} }
lock->nHolding--; lock->nHolding--;
lock->holders[lockmode]--; lock->holders[lockmode]--;
LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding);
SpinRelease(masterLock); SpinRelease(masterLock);
#ifdef USER_LOCKS_DEBUG
elog(NOTICE, "LockAcquire: user lock failed");
#endif
return (FALSE); return (FALSE);
} }
#endif #endif
status = WaitOnLock(lockmethod, lock, lockmode); status = WaitOnLock(lockmethod, lock, lockmode, xid);
XID_PRINT("Someone granted me the lock", result); /*
* Check the xid entry status, in case something in the
* ipc communication doesn't work correctly.
*/
if (!((result->nHolding > 0) && (result->holders[lockmode] > 0))) {
XID_PRINT_AUX("LockAcquire: INCONSISTENT ", result);
LOCK_PRINT_AUX("LockAcquire: INCONSISTENT ", lock, lockmode);
/* Should we retry ? */
return (FALSE);
}
XID_PRINT("LockAcquire: granted", result);
LOCK_PRINT("LockAcquire: granted", lock, lockmode);
} }
SpinRelease(masterLock); SpinRelease(masterLock);
...@@ -646,7 +725,8 @@ int ...@@ -646,7 +725,8 @@ int
LockResolveConflicts(LOCKMETHOD lockmethod, LockResolveConflicts(LOCKMETHOD lockmethod,
LOCK *lock, LOCK *lock,
LOCKMODE lockmode, LOCKMODE lockmode,
TransactionId xid) TransactionId xid,
XIDLookupEnt *xidentP) /* xident ptr or NULL */
{ {
XIDLookupEnt *result, XIDLookupEnt *result,
item; item;
...@@ -657,44 +737,81 @@ LockResolveConflicts(LOCKMETHOD lockmethod, ...@@ -657,44 +737,81 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
int bitmask; int bitmask;
int i, int i,
tmpMask; tmpMask;
#ifdef USER_LOCKS
int is_user_lock;
#endif
numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes; numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
xidTable = LockMethodTable[lockmethod]->xidHash; xidTable = LockMethodTable[lockmethod]->xidHash;
/* --------------------- if (xidentP) {
* read my own statistics from the xid table. If there /*
* isn't an entry, then we'll just add one. * A pointer to the xid entry was supplied from the caller.
* * Actually only LockAcquire can do it.
* Zero out the tag, this clears the padding bytes for long */
* word alignment and ensures hashing consistency. result = xidentP;
* ------------------ } else {
*/ /* ---------------------
MemSet(&item, 0, XID_TAGSIZE); * read my own statistics from the xid table. If there
TransactionIdStore(xid, &item.tag.xid); * isn't an entry, then we'll just add one.
item.tag.lock = MAKE_OFFSET(lock); *
#if 0 * Zero out the tag, this clears the padding bytes for long
item.tag.pid = pid; * word alignment and ensures hashing consistency.
* ------------------
*/
MemSet(&item, 0, XID_TAGSIZE);
item.tag.lock = MAKE_OFFSET(lock);
#ifdef USE_XIDTAG_LOCKMETHOD
item.tag.lockmethod = lockmethod;
#endif
#ifdef USER_LOCKS
is_user_lock = (lockmethod == 2);
if (is_user_lock) {
item.tag.pid = MyProcPid;
item.tag.xid = 0;
} else {
TransactionIdStore(xid, &item.tag.xid);
}
#else
TransactionIdStore(xid, &item.tag.xid);
#endif #endif
if (!(result = (XIDLookupEnt *) /*
hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found))) * Find or create an xid entry with this tag
{ */
elog(NOTICE, "LockResolveConflicts: xid table corrupted"); result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
return (STATUS_ERROR); HASH_ENTER, &found);
} if (!result)
myHolders = result->holders; {
elog(NOTICE, "LockResolveConflicts: xid table corrupted");
return (STATUS_ERROR);
}
if (!found) /*
{ * If not found initialize the new entry. THIS SHOULD NEVER HAPPEN,
/* --------------- * if we are trying to resolve a conflict we must already have
* we're not holding any type of lock yet. Clear * allocated an xid entry for this lock. dz 21-11-1997
* the lock stats.
* ---------------
*/ */
MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders))); if (!found)
result->nHolding = 0; {
/* ---------------
* we're not holding any type of lock yet. Clear
* the lock stats.
* ---------------
*/
MemSet(result->holders,0, numLockModes * sizeof(*(lock->holders)));
result->nHolding = 0;
XID_PRINT_AUX("LockResolveConflicts: NOT FOUND", result);
} else {
XID_PRINT("LockResolveConflicts: found", result);
}
} }
Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
/*
* We can control runtime this option. Default is lockReadPriority=0
*/
if (!lockReadPriority)
{ {
/* ------------------------ /* ------------------------
* If someone with a greater priority is waiting for the lock, * If someone with a greater priority is waiting for the lock,
...@@ -705,8 +822,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod, ...@@ -705,8 +822,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
PROC_QUEUE *waitQueue = &(lock->waitProcs); PROC_QUEUE *waitQueue = &(lock->waitProcs);
PROC *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev); PROC *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
if (waitQueue->size && topproc->prio > myprio) if (waitQueue->size && topproc->prio > myprio) {
XID_PRINT("LockResolveConflicts: higher priority proc waiting",
result);
return STATUS_FOUND; return STATUS_FOUND;
}
} }
/* ---------------------------- /* ----------------------------
...@@ -721,12 +841,10 @@ LockResolveConflicts(LOCKMETHOD lockmethod, ...@@ -721,12 +841,10 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
*/ */
if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask)) if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
{ {
result->holders[lockmode]++; result->holders[lockmode]++;
result->nHolding++; result->nHolding++;
XID_PRINT("LockResolveConflicts: no conflict", result);
XID_PRINT("Conflict Resolved: updated xid entry stats", result); Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
return (STATUS_OK); return (STATUS_OK);
} }
...@@ -736,6 +854,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod, ...@@ -736,6 +854,7 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
* that does not reflect our own locks. * that does not reflect our own locks.
* ------------------------ * ------------------------
*/ */
myHolders = result->holders;
bitmask = 0; bitmask = 0;
tmpMask = 2; tmpMask = 2;
for (i = 1; i <= numLockModes; i++, tmpMask <<= 1) for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
...@@ -753,27 +872,42 @@ LockResolveConflicts(LOCKMETHOD lockmethod, ...@@ -753,27 +872,42 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
*/ */
if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask)) if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
{ {
/* no conflict. Get the lock and go on */ /* no conflict. Get the lock and go on */
result->holders[lockmode]++; result->holders[lockmode]++;
result->nHolding++; result->nHolding++;
XID_PRINT("LockResolveConflicts: resolved", result);
XID_PRINT("Conflict Resolved: updated xid entry stats", result); Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
return (STATUS_OK); return (STATUS_OK);
} }
XID_PRINT("LockResolveConflicts: conflicting", result);
return (STATUS_FOUND); return (STATUS_FOUND);
} }
/*
* GrantLock -- update the lock data structure to show
* the new lock holder.
*/
void
GrantLock(LOCK *lock, LOCKMODE lockmode)
{
lock->nActive++;
lock->activeHolders[lockmode]++;
lock->mask |= BITS_ON[lockmode];
LOCK_PRINT("GrantLock", lock, lockmode);
Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
Assert(lock->nActive <= lock->nHolding);
}
static int static int
WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode) WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
TransactionId xid)
{ {
PROC_QUEUE *waitQueue = &(lock->waitProcs); PROC_QUEUE *waitQueue = &(lock->waitProcs);
LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod]; LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
int prio = lockMethodTable->ctl->prio[lockmode]; int prio = lockMethodTable->ctl->prio[lockmode];
char old_status[64],
new_status[64];
Assert(lockmethod < NumLockMethods); Assert(lockmethod < NumLockMethods);
...@@ -785,27 +919,38 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode) ...@@ -785,27 +919,38 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
* synchronization for this queue. That will not be true if/when * synchronization for this queue. That will not be true if/when
* people can be deleted from the queue by a SIGINT or something. * people can be deleted from the queue by a SIGINT or something.
*/ */
LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockmode); LOCK_PRINT_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
strcpy(old_status, PS_STATUS);
strcpy(new_status, PS_STATUS);
strcat(new_status, " waiting");
PS_SET_STATUS(new_status);
if (ProcSleep(waitQueue, if (ProcSleep(waitQueue,
lockMethodTable->ctl->masterLock, lockMethodTable->ctl->masterLock,
lockmode, lockmode,
prio, prio,
lock) != NO_ERROR) lock,
xid) != NO_ERROR)
{ {
/* ------------------- /* -------------------
* This could have happend as a result of a deadlock, see HandleDeadLock() * This could have happend as a result of a deadlock,
* Decrement the lock nHolding and holders fields as we are no longer * see HandleDeadLock().
* waiting on this lock. * Decrement the lock nHolding and holders fields as
* we are no longer waiting on this lock.
* ------------------- * -------------------
*/ */
lock->nHolding--; lock->nHolding--;
lock->holders[lockmode]--; lock->holders[lockmode]--;
LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockmode); LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding);
SpinRelease(lockMethodTable->ctl->masterLock); SpinRelease(lockMethodTable->ctl->masterLock);
elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction"); elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
/* not reached */
} }
LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockmode); PS_SET_STATUS(old_status);
LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
return (STATUS_OK); return (STATUS_OK);
} }
...@@ -829,25 +974,34 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -829,25 +974,34 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
XIDLookupEnt *result, XIDLookupEnt *result,
item; item;
HTAB *xidTable; HTAB *xidTable;
TransactionId xid;
bool wakeupNeeded = true; bool wakeupNeeded = true;
int trace_flag;
#ifdef USER_LOCKS #ifdef USER_LOCKS
int is_user_lock; int is_user_lock;
is_user_lock = (lockmethod == 0); is_user_lock = (lockmethod == USER_LOCKMETHOD);
if (is_user_lock) if (is_user_lock)
{ {
lockmethod = 1; TPRINTF(TRACE_USERLOCKS, "LockRelease: user lock tag [%u,%u] %d",
#ifdef USER_LOCKS_DEBUG locktag->tupleId.ip_posid,
elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d", ((locktag->tupleId.ip_blkid.bi_hi << 16) +
locktag->tupleId.ip_posid, locktag->tupleId.ip_blkid.bi_lo),
((locktag->tupleId.ip_blkid.bi_hi << 16) + lockmode);
locktag->tupleId.ip_blkid.bi_lo),
lockmode);
#endif
} }
#endif #endif
/* ???????? This must be changed when short term locks will be used */
locktag->lockmethod = lockmethod;
#ifdef USER_LOCKS
trace_flag = \
(lockmethod == USER_LOCKMETHOD) ? TRACE_USERLOCKS : TRACE_LOCKS;
#else
trace_flag = TRACE_LOCKS;
#endif
Assert(lockmethod < NumLockMethods); Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod]; lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable) if (!lockMethodTable)
...@@ -859,34 +1013,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -859,34 +1013,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (LockingIsDisabled) if (LockingIsDisabled)
return (TRUE); return (TRUE);
LOCK_PRINT("Release", locktag, lockmode);
masterLock = lockMethodTable->ctl->masterLock; masterLock = lockMethodTable->ctl->masterLock;
xidTable = lockMethodTable->xidHash;
SpinAcquire(masterLock); SpinAcquire(masterLock);
Assert(lockMethodTable->lockHash->hash == tag_hash);
lock = (LOCK *)
hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_FIND_SAVE, &found);
#ifdef USER_LOCKS
/* /*
* If the entry is not found hash_search returns TRUE instead of NULL, * Find a lock with this tag
* so we must check it explicitly.
*/ */
if ((is_user_lock) && (lock == (LOCK *) TRUE)) Assert(lockMethodTable->lockHash->hash == tag_hash);
{ lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
SpinRelease(masterLock); HASH_FIND, &found);
elog(NOTICE, "LockRelease: there are no locks with this tag");
return (FALSE);
}
#endif
/* /*
* let the caller print its own error message, too. Do not * let the caller print its own error message, too. Do not elog(ERROR).
* elog(ERROR).
*/ */
if (!lock) if (!lock)
{ {
...@@ -898,52 +1036,20 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -898,52 +1036,20 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (!found) if (!found)
{ {
SpinRelease(masterLock); SpinRelease(masterLock);
#ifdef USER_LOCKS
if (is_user_lock) {
TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
return (FALSE);
}
#endif
elog(NOTICE, "LockRelease: locktable lookup failed, no lock"); elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
return (FALSE); return (FALSE);
} }
LOCK_PRINT("LockRelease: found", lock, lockmode);
Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding);
Assert(lock->nHolding > 0);
#ifdef USER_LOCKS
/*
* If this is an user lock it can be removed only after checking that
* it was acquired by the current process, so this code is skipped and
* executed later.
*/
if (!is_user_lock)
{
#endif
/*
* fix the general lock stats
*/
lock->nHolding--;
lock->holders[lockmode]--;
lock->nActive--;
lock->activeHolders[lockmode]--;
Assert(lock->nActive >= 0);
if (!lock->nHolding)
{
/* ------------------
* if there's no one waiting in the queue,
* we just released the last lock.
* Delete it from the lock table.
* ------------------
*/
Assert(lockMethodTable->lockHash->hash == tag_hash);
lock = (LOCK *) hash_search(lockMethodTable->lockHash,
(Pointer) &(lock->tag),
HASH_REMOVE_SAVED,
&found);
Assert(lock && found);
wakeupNeeded = false;
}
#ifdef USER_LOCKS
}
#endif
/* ------------------ /* ------------------
* Zero out all of the tag bytes (this clears the padding bytes for long * Zero out all of the tag bytes (this clears the padding bytes for long
...@@ -951,42 +1057,102 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -951,42 +1057,102 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* ------------------ * ------------------
*/ */
MemSet(&item, 0, XID_TAGSIZE); MemSet(&item, 0, XID_TAGSIZE);
TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
item.tag.lock = MAKE_OFFSET(lock); item.tag.lock = MAKE_OFFSET(lock);
#if 0 #ifdef USE_XIDTAG_LOCKMETHOD
item.tag.pid = MyPid; item.tag.lockmethod = lockmethod;
#endif #endif
#ifdef USER_LOCKS #ifdef USER_LOCKS
if (is_user_lock) if (is_user_lock) {
{
item.tag.pid = MyProcPid; item.tag.pid = MyProcPid;
item.tag.xid = 0; item.tag.xid = xid = 0;
#ifdef USER_LOCKS_DEBUG } else {
elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]", xid = GetCurrentTransactionId();
item.tag.lock, item.tag.pid, item.tag.xid); TransactionIdStore(xid, &item.tag.xid);
#endif
} }
#else
xid = GetCurrentTransactionId();
TransactionIdStore(xid, &item.tag.xid);
#endif #endif
if (!(result = (XIDLookupEnt *) hash_search(xidTable, /*
(Pointer) &item, * Find an xid entry with this tag
HASH_FIND_SAVE, */
&found)) xidTable = lockMethodTable->xidHash;
|| !found) result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
HASH_FIND_SAVE, &found);
if (!result || !found)
{ {
SpinRelease(masterLock); SpinRelease(masterLock);
#ifdef USER_LOCKS #ifdef USER_LOCKS
if ((is_user_lock) && (result)) if (!found && is_user_lock) {
elog(NOTICE, "LockRelease: you don't have a lock on this tag"); TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
else } else
elog(NOTICE, "LockRelease: find xid, table corrupted");
#else
elog(NOTICE, "LockReplace: xid table corrupted");
#endif #endif
elog(NOTICE, "LockReplace: xid table corrupted");
return (FALSE); return (FALSE);
} }
XID_PRINT("LockRelease: found", result);
Assert(result->tag.lock == MAKE_OFFSET(lock));
/*
* Check that we are actually holding a lock of the type we want
* to release.
*/
if (!(result->holders[lockmode] > 0)) {
SpinRelease(masterLock);
XID_PRINT_AUX("LockAcquire: WRONGTYPE", result);
elog(NOTICE, "LockRelease: you don't own a lock of type %s",
lock_types[lockmode]);
Assert(result->holders[lockmode] >= 0);
return (FALSE);
}
Assert(result->nHolding > 0);
/*
* fix the general lock stats
*/
lock->nHolding--;
lock->holders[lockmode]--;
lock->nActive--;
lock->activeHolders[lockmode]--;
/* --------------------------
* If there are still active locks of the type I just released, no one
* should be woken up. Whoever is asleep will still conflict
* with the remaining locks.
* --------------------------
*/
if (lock->activeHolders[lockmode])
{
wakeupNeeded = false;
}
else
{
/* change the conflict mask. No more of this lock type. */
lock->mask &= BITS_OFF[lockmode];
}
LOCK_PRINT("LockRelease: updated", lock, lockmode);
Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding);
if (!lock->nHolding)
{
/* ------------------
* if there's no one waiting in the queue,
* we just released the last lock.
* Delete it from the lock table.
* ------------------
*/
Assert(lockMethodTable->lockHash->hash == tag_hash);
lock = (LOCK *) hash_search(lockMethodTable->lockHash,
(Pointer) &(lock->tag),
HASH_REMOVE,
&found);
Assert(lock && found);
wakeupNeeded = false;
}
/* /*
* now check to see if I have any private locks. If I do, decrement * now check to see if I have any private locks. If I do, decrement
...@@ -994,8 +1160,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -994,8 +1160,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/ */
result->holders[lockmode]--; result->holders[lockmode]--;
result->nHolding--; result->nHolding--;
XID_PRINT("LockRelease: updated", result);
XID_PRINT("LockRelease updated xid stats", result); Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
/* /*
* If this was my last hold on this lock, delete my entry in the XID * If this was my last hold on this lock, delete my entry in the XID
...@@ -1003,78 +1169,25 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -1003,78 +1169,25 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/ */
if (!result->nHolding) if (!result->nHolding)
{ {
#ifdef USER_LOCKS if (result->queue.prev == INVALID_OFFSET) {
if (result->queue.prev == INVALID_OFFSET)
elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET"); elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
if (result->queue.next == INVALID_OFFSET) }
if (result->queue.next == INVALID_OFFSET) {
elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET"); elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
#endif }
if (result->queue.next != INVALID_OFFSET) if (result->queue.next != INVALID_OFFSET)
SHMQueueDelete(&result->queue); SHMQueueDelete(&result->queue);
if (!(result = (XIDLookupEnt *) XID_PRINT("LockRelease: deleting", result);
hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) || result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result,
!found) HASH_REMOVE_SAVED, &found);
if (!result || !found)
{ {
SpinRelease(masterLock); SpinRelease(masterLock);
#ifdef USER_LOCKS
elog(NOTICE, "LockRelease: remove xid, table corrupted"); elog(NOTICE, "LockRelease: remove xid, table corrupted");
#else
elog(NOTICE, "LockReplace: xid table corrupted");
#endif
return (FALSE); return (FALSE);
} }
} }
#ifdef USER_LOCKS
/*
* If this is an user lock remove it now, after the corresponding xid
* entry has been found and deleted.
*/
if (is_user_lock)
{
/*
* fix the general lock stats
*/
lock->nHolding--;
lock->holders[lockmode]--;
lock->nActive--;
lock->activeHolders[lockmode]--;
Assert(lock->nActive >= 0);
if (!lock->nHolding)
{
/* ------------------
* if there's no one waiting in the queue,
* we just released the last lock.
* Delete it from the lock table.
* ------------------
*/
Assert(lockMethodTable->lockHash->hash == tag_hash);
lock = (LOCK *) hash_search(lockMethodTable->lockHash,
(Pointer) &(lock->tag),
HASH_REMOVE,
&found);
Assert(lock && found);
wakeupNeeded = false;
}
}
#endif
/* --------------------------
* If there are still active locks of the type I just released, no one
* should be woken up. Whoever is asleep will still conflict
* with the remaining locks.
* --------------------------
*/
if (!(lock->activeHolders[lockmode]))
{
/* change the conflict mask. No more of this lock type. */
lock->mask &= BITS_OFF[lockmode];
}
if (wakeupNeeded) if (wakeupNeeded)
{ {
/* -------------------------- /* --------------------------
...@@ -1085,35 +1198,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode) ...@@ -1085,35 +1198,18 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
*/ */
ProcLockWakeup(&(lock->waitProcs), lockmethod, lock); ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
} }
else
{
LOCK_TPRINTF(lock, "LockRelease: no wakeup needed");
}
SpinRelease(masterLock); SpinRelease(masterLock);
return (TRUE); return (TRUE);
} }
/*
* GrantLock -- update the lock data structure to show
* the new lock holder.
*/
void
GrantLock(LOCK *lock, LOCKMODE lockmode)
{
lock->nActive++;
lock->activeHolders[lockmode]++;
lock->mask |= BITS_ON[lockmode];
}
#ifdef USER_LOCKS
/* /*
* LockReleaseAll -- Release all locks in a process lock queue. * LockReleaseAll -- Release all locks in a process lock queue.
*
* Note: This code is a little complicated by the presence in the
* same queue of user locks which can't be removed from the
* normal lock queue at the end of a transaction. They must
* however be removed when the backend exits.
* A dummy lockmethod 0 is used to indicate that we are releasing
* the user locks, from the code added to ProcKill().
*/ */
#endif
bool bool
LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
{ {
...@@ -1121,6 +1217,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) ...@@ -1121,6 +1217,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
int done; int done;
XIDLookupEnt *xidLook = NULL; XIDLookupEnt *xidLook = NULL;
XIDLookupEnt *tmp = NULL; XIDLookupEnt *tmp = NULL;
XIDLookupEnt *result;
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
SPINLOCK masterLock; SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable; LOCKMETHODTABLE *lockMethodTable;
...@@ -1128,45 +1225,52 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) ...@@ -1128,45 +1225,52 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
numLockModes; numLockModes;
LOCK *lock; LOCK *lock;
bool found; bool found;
int trace_flag;
int xidtag_lockmethod;
#ifdef USER_LOCKS #ifdef USER_LOCKS
int is_user_lock_table, int is_user_lock_table,
count, count,
nskip; nleft;
is_user_lock_table = (lockmethod == 0); count = nleft = 0;
#ifdef USER_LOCKS_DEBUG
elog(NOTICE, "LockReleaseAll: lockmethod=%d, pid=%d", lockmethod, MyProcPid); is_user_lock_table = (lockmethod == USER_LOCKMETHOD);
#endif trace_flag = (lockmethod == 2) ? TRACE_USERLOCKS : TRACE_LOCKS;
if (is_user_lock_table) #else
lockmethod = 1; trace_flag = TRACE_LOCKS;
#endif #endif
TPRINTF(trace_flag, "LockReleaseAll: lockmethod=%d, pid=%d",
lockmethod, MyProcPid);
Assert(lockmethod < NumLockMethods); Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod]; lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable) if (!lockMethodTable) {
elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod);
return (FALSE); return (FALSE);
}
numLockModes = lockMethodTable->ctl->numLockModes;
masterLock = lockMethodTable->ctl->masterLock;
if (SHMQueueEmpty(lockQueue)) if (SHMQueueEmpty(lockQueue))
return TRUE; return TRUE;
#ifdef USER_LOCKS numLockModes = lockMethodTable->ctl->numLockModes;
masterLock = lockMethodTable->ctl->masterLock;
SpinAcquire(masterLock); SpinAcquire(masterLock);
#endif
SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue); SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
XID_PRINT("LockReleaseAll", xidLook);
#ifndef USER_LOCKS
SpinAcquire(masterLock);
#else
count = nskip = 0;
#endif
for (;;) for (;;)
{ {
/*
* Sometimes the queue appears to be messed up.
*/
if (count++ > 1000)
{
elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
nleft = 0;
break;
}
/* --------------------------- /* ---------------------------
* XXX Here we assume the shared memory queue is circular and * XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of * that we know its internal structure. Should have some sort of
...@@ -1176,72 +1280,73 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) ...@@ -1176,72 +1280,73 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
done = (xidLook->queue.next == end); done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
LOCK_PRINT("LockReleaseAll", (&lock->tag), 0); xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook);
if ((xidtag_lockmethod == lockmethod) || (trace_flag >= 2)) {
#ifdef USER_LOCKS XID_PRINT("LockReleaseAll", xidLook);
LOCK_PRINT("LockReleaseAll", lock, 0);
}
/* #ifdef USE_XIDTAG_LOCKMETHOD
* Sometimes the queue appears to be messed up. if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock))
*/ elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d",
if (count++ > 2000) xidtag_lockmethod, lock->tag.lockmethod);
{ #endif
elog(NOTICE, "LockReleaseAll: xid loop detected, giving up"); if ((xidtag_lockmethod != lockmethod) && (trace_flag >= 2)) {
nskip = 0; TPRINTF(trace_flag, "LockReleaseAll: skipping other table");
break; nleft++;
goto next_item;
} }
Assert(lock->nHolding > 0);
Assert(lock->nActive > 0);
Assert(lock->nActive <= lock->nHolding);
Assert(xidLook->nHolding >= 0);
Assert(xidLook->nHolding <= lock->nHolding);
#ifdef USER_LOCKS
if (is_user_lock_table) if (is_user_lock_table)
{ {
if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0)) if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
{ {
#ifdef USER_LOCKS_DEBUG TPRINTF(TRACE_USERLOCKS,
elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]", "LockReleaseAll: skiping normal lock [%d,%d,%d]",
xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
#endif nleft++;
nskip++;
goto next_item; goto next_item;
} }
if (xidLook->tag.pid != MyProcPid) if (xidLook->tag.pid != MyProcPid)
{ {
/* This should never happen */ /* Should never happen */
#ifdef USER_LOCKS_DEBUG
elog(NOTICE, elog(NOTICE,
"LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]", "LockReleaseAll: INVALID PID: [%u,%u] [%d,%d,%d]",
lock->tag.tupleId.ip_posid, lock->tag.tupleId.ip_posid,
((lock->tag.tupleId.ip_blkid.bi_hi << 16) + ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
lock->tag.tupleId.ip_blkid.bi_lo), lock->tag.tupleId.ip_blkid.bi_lo),
xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
#endif nleft++;
nskip++;
goto next_item; goto next_item;
} }
#ifdef USER_LOCKS_DEBUG TPRINTF(TRACE_USERLOCKS,
elog(NOTICE, "LockReleaseAll: releasing user lock [%u,%u] [%d,%d,%d]",
"LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]", lock->tag.tupleId.ip_posid,
lock->tag.tupleId.ip_posid, ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
((lock->tag.tupleId.ip_blkid.bi_hi << 16) + lock->tag.tupleId.ip_blkid.bi_lo),
lock->tag.tupleId.ip_blkid.bi_lo), xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
#endif
} }
else else
{ {
if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0)) /* Can't check xidLook->tag.xid, can be 0 also for normal locks */
if (xidLook->tag.pid != 0)
{ {
#ifdef USER_LOCKS_DEBUG TPRINTF(TRACE_LOCKS,
elog(NOTICE, "LockReleaseAll: skiping user lock [%u,%u] [%d,%d,%d]",
"LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]", lock->tag.tupleId.ip_posid,
lock->tag.tupleId.ip_posid, ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
((lock->tag.tupleId.ip_blkid.bi_hi << 16) + lock->tag.tupleId.ip_blkid.bi_lo),
lock->tag.tupleId.ip_blkid.bi_lo), xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid); nleft++;
#endif
nskip++;
goto next_item; goto next_item;
} }
#ifdef USER_LOCKS_DEBUG
elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]",
xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
#endif
} }
#endif #endif
...@@ -1251,16 +1356,20 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) ...@@ -1251,16 +1356,20 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
*/ */
if (lock->nHolding != xidLook->nHolding) if (lock->nHolding != xidLook->nHolding)
{ {
lock->nHolding -= xidLook->nHolding;
lock->nActive -= xidLook->nHolding;
Assert(lock->nActive >= 0);
for (i = 1; i <= numLockModes; i++) for (i = 1; i <= numLockModes; i++)
{ {
Assert(xidLook->holders[i] >= 0);
lock->holders[i] -= xidLook->holders[i]; lock->holders[i] -= xidLook->holders[i];
lock->activeHolders[i] -= xidLook->holders[i]; lock->activeHolders[i] -= xidLook->holders[i];
Assert((lock->holders[i] >= 0) \
&& (lock->activeHolders[i] >= 0));
if (!lock->activeHolders[i]) if (!lock->activeHolders[i])
lock->mask &= BITS_OFF[i]; lock->mask &= BITS_OFF[i];
} }
lock->nHolding -= xidLook->nHolding;
lock->nActive -= xidLook->nHolding;
Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
Assert(lock->nActive <= lock->nHolding);
} }
else else
{ {
...@@ -1270,16 +1379,30 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) ...@@ -1270,16 +1379,30 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
* -------------- * --------------
*/ */
lock->nHolding = 0; lock->nHolding = 0;
/* Fix the lock status, just for next LOCK_PRINT message. */
for (i=1; i<=numLockModes; i++) {
Assert(lock->holders[i] == lock->activeHolders[i]);
lock->holders[i] = lock->activeHolders[i] = 0;
}
} }
LOCK_PRINT("LockReleaseAll: updated", lock, 0);
/*
* Remove the xid from the process lock queue
*/
SHMQueueDelete(&xidLook->queue);
/* ---------------- /* ----------------
* always remove the xidLookup entry, we're done with it now * always remove the xidLookup entry, we're done with it now
* ---------------- * ----------------
*/ */
#ifdef USER_LOCKS
SHMQueueDelete(&xidLook->queue); XID_PRINT("LockReleaseAll: deleting", xidLook);
#endif result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
if ((!hash_search(lockMethodTable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found)) (Pointer) xidLook,
|| !found) HASH_REMOVE,
&found);
if (!result || !found)
{ {
SpinRelease(masterLock); SpinRelease(masterLock);
elog(NOTICE, "LockReleaseAll: xid table corrupted"); elog(NOTICE, "LockReleaseAll: xid table corrupted");
...@@ -1293,10 +1416,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue) ...@@ -1293,10 +1416,11 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
* the last lock. * the last lock.
* -------------------- * --------------------
*/ */
LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
Assert(lockMethodTable->lockHash->hash == tag_hash); Assert(lockMethodTable->lockHash->hash == tag_hash);
lock = (LOCK *) lock = (LOCK *) hash_search(lockMethodTable->lockHash,
hash_search(lockMethodTable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found); (Pointer) &(lock->tag),
HASH_REMOVE, &found);
if ((!lock) || (!found)) if ((!lock) || (!found))
{ {
SpinRelease(masterLock); SpinRelease(masterLock);
...@@ -1324,15 +1448,18 @@ next_item: ...@@ -1324,15 +1448,18 @@ next_item:
SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue); SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
xidLook = tmp; xidLook = tmp;
} }
SpinRelease(masterLock);
#ifdef USER_LOCKS
/* /*
* Reinitialize the queue only if nothing has been left in. * Reinitialize the queue only if nothing has been left in.
*/ */
if (nskip == 0) if (nleft == 0) {
#endif TPRINTF(trace_flag, "LockReleaseAll: reinitializing lockQueue");
SHMQueueInit(lockQueue); SHMQueueInit(lockQueue);
}
SpinRelease(masterLock);
TPRINTF(trace_flag, "LockReleaseAll: done");
return TRUE; return TRUE;
} }
...@@ -1420,7 +1547,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) ...@@ -1420,7 +1547,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
checked_procs[0] = MyProc; checked_procs[0] = MyProc;
nprocs = 1; nprocs = 1;
lockMethodTable = LockMethodTable[1]; lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD];
xidTable = lockMethodTable->xidHash; xidTable = lockMethodTable->xidHash;
MemSet(&item, 0, XID_TAGSIZE); MemSet(&item, 0, XID_TAGSIZE);
...@@ -1456,7 +1583,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) ...@@ -1456,7 +1583,7 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
done = (xidLook->queue.next == end); done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
LOCK_PRINT("DeadLockCheck", (&lock->tag), 0); LOCK_PRINT("DeadLockCheck", lock, 0);
/* /*
* This is our only check to see if we found the lock we want. * This is our only check to see if we found the lock we want.
...@@ -1560,9 +1687,190 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) ...@@ -1560,9 +1687,190 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
return false; return false;
} }
/*
* Return an array with the pids of all processes owning a lock.
* This works only for user locks because normal locks have no
* pid information in the corresponding XIDLookupEnt.
*/
ArrayType *
LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag)
{
XIDLookupEnt *xidLook = NULL;
SPINLOCK masterLock;
LOCK *lock;
SHMEM_OFFSET lock_offset;
int count = 0;
LOCKMETHODTABLE *lockMethodTable;
HTAB *xidTable;
bool found;
int ndims,
nitems,
hdrlen,
size;
int lbounds[1],
hbounds[1];
ArrayType *array;
int *data_ptr;
/* Assume that no one will modify the result */
static int empty_array[] = { 20, 1, 0, 0, 0 };
#ifdef USER_LOCKS
int is_user_lock;
is_user_lock = (lockmethod == USER_LOCKMETHOD);
if (is_user_lock)
{
TPRINTF(TRACE_USERLOCKS, "LockOwners: user lock tag [%u,%u]",
locktag->tupleId.ip_posid,
((locktag->tupleId.ip_blkid.bi_hi << 16) +
locktag->tupleId.ip_blkid.bi_lo));
}
#endif
/* This must be changed when short term locks will be used */
locktag->lockmethod = lockmethod;
Assert((lockmethod >= MIN_LOCKMETHOD) && (lockmethod < NumLockMethods));
lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable)
{
elog(NOTICE, "lockMethodTable is null in LockOwners");
return ((ArrayType *) &empty_array);
}
if (LockingIsDisabled)
{
return ((ArrayType *) &empty_array);
}
masterLock = lockMethodTable->ctl->masterLock;
SpinAcquire(masterLock);
/*
* Find a lock with this tag
*/
Assert(lockMethodTable->lockHash->hash == tag_hash);
lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
HASH_FIND, &found);
/*
* let the caller print its own error message, too. Do not elog(WARN).
*/
if (!lock)
{
SpinRelease(masterLock);
elog(NOTICE, "LockOwners: locktable corrupted");
return ((ArrayType *) &empty_array);
}
if (!found)
{
SpinRelease(masterLock);
#ifdef USER_LOCKS
if (is_user_lock) {
TPRINTF(TRACE_USERLOCKS, "LockOwners: no lock with this tag");
return ((ArrayType *) &empty_array);
}
#endif
elog(NOTICE, "LockOwners: locktable lookup failed, no lock");
return ((ArrayType *) &empty_array);
}
LOCK_PRINT("LockOwners: found", lock, 0);
Assert((lock->nHolding > 0) && (lock->nActive > 0));
Assert(lock->nActive <= lock->nHolding);
lock_offset = MAKE_OFFSET(lock);
/* Construct a 1-dimensional array */
ndims = 1;
hdrlen = ARR_OVERHEAD(ndims);
lbounds[0] = 0;
hbounds[0] = lock->nActive;
size = hdrlen + sizeof(int) * hbounds[0];
array = (ArrayType *) palloc(size);
MemSet(array, 0, size);
memmove((char *) array, (char *) &size, sizeof(int));
memmove((char *) ARR_NDIM_PTR(array), (char *) &ndims, sizeof(int));
memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
memmove((char *) ARR_LBOUND(array), (char *) lbounds, ndims * sizeof(int));
SET_LO_FLAG(false, array);
data_ptr = (int *) ARR_DATA_PTR(array);
xidTable = lockMethodTable->xidHash;
hash_seq(NULL);
nitems = 0;
while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) &&
(xidLook != (XIDLookupEnt *)TRUE)) {
if (count++ > 1000) {
elog(NOTICE,"LockOwners: possible loop, giving up");
break;
}
if (xidLook->tag.pid == 0) {
XID_PRINT("LockOwners: no pid", xidLook);
continue;
}
if (!xidLook->tag.lock) {
XID_PRINT("LockOwners: NULL LOCK", xidLook);
continue;
}
if (xidLook->tag.lock != lock_offset) {
XID_PRINT("LockOwners: different lock", xidLook);
continue;
}
if (LOCK_LOCKMETHOD(*lock) != lockmethod) {
XID_PRINT("LockOwners: other table", xidLook);
continue;
}
if (xidLook->nHolding <= 0) {
XID_PRINT("LockOwners: not holding", xidLook);
continue;
}
if (nitems >= hbounds[0]) {
elog(NOTICE,"LockOwners: array size exceeded");
break;
}
/*
* Check that the holding process is still alive by sending
* him an unused (ignored) signal. If the kill fails the
* process is not alive.
*/
if ((xidLook->tag.pid != MyProcPid) \
&& (kill(xidLook->tag.pid, SIGCHLD)) != 0)
{
/* Return a negative pid to signal that process is dead */
data_ptr[nitems++] = - (xidLook->tag.pid);
XID_PRINT("LockOwners: not alive", xidLook);
/* XXX - TODO: remove this entry and update lock stats */
continue;
}
/* Found a process holding the lock */
XID_PRINT("LockOwners: holding", xidLook);
data_ptr[nitems++] = xidLook->tag.pid;
}
SpinRelease(masterLock);
/* Adjust the actual size of the array */
hbounds[0] = nitems;
size = hdrlen + sizeof(int) * hbounds[0];
memmove((char *) array, (char *) &size, sizeof(int));
memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
return (array);
}
#ifdef DEADLOCK_DEBUG #ifdef DEADLOCK_DEBUG
/* /*
* Dump all locks. Must have already acquired the masterLock. * Dump all locks in the proc->lockQueue. Must have already acquired
* the masterLock.
*/ */
void void
DumpLocks() DumpLocks()
...@@ -1577,9 +1885,8 @@ DumpLocks() ...@@ -1577,9 +1885,8 @@ DumpLocks()
SPINLOCK masterLock; SPINLOCK masterLock;
int numLockModes; int numLockModes;
LOCK *lock; LOCK *lock;
int count = 0;
count; int lockmethod = DEFAULT_LOCKMETHOD;
int lockmethod = 1;
LOCKMETHODTABLE *lockMethodTable; LOCKMETHODTABLE *lockMethodTable;
ShmemPIDLookup(MyProcPid, &location); ShmemPIDLookup(MyProcPid, &location);
...@@ -1604,11 +1911,18 @@ DumpLocks() ...@@ -1604,11 +1911,18 @@ DumpLocks()
SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue); SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
end = MAKE_OFFSET(lockQueue); end = MAKE_OFFSET(lockQueue);
LOCK_DUMP("DumpLocks", MyProc->waitLock, 0); if (MyProc->waitLock) {
XID_PRINT("DumpLocks", xidLook); LOCK_PRINT_AUX("DumpLocks: waiting on", MyProc->waitLock, 0);
}
for (count = 0;;) for (;;)
{ {
if (count++ > 2000)
{
elog(NOTICE, "DumpLocks: xid loop detected, giving up");
break;
}
/* --------------------------- /* ---------------------------
* XXX Here we assume the shared memory queue is circular and * XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of * that we know its internal structure. Should have some sort of
...@@ -1618,19 +1932,68 @@ DumpLocks() ...@@ -1618,19 +1932,68 @@ DumpLocks()
done = (xidLook->queue.next == end); done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
LOCK_DUMP("DumpLocks", lock, 0); XID_PRINT_AUX("DumpLocks", xidLook);
LOCK_PRINT_AUX("DumpLocks", lock, 0);
if (count++ > 2000)
{
elog(NOTICE, "DumpLocks: xid loop detected, giving up");
break;
}
if (done) if (done)
break; break;
SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue); SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
xidLook = tmp; xidLook = tmp;
} }
} }
/*
* Dump all postgres locks. Must have already acquired the masterLock.
*/
void
DumpAllLocks()
{
SHMEM_OFFSET location;
PROC *proc;
XIDLookupEnt *xidLook = NULL;
LOCK *lock;
int pid;
int count = 0;
int lockmethod = DEFAULT_LOCKMETHOD;
LOCKMETHODTABLE *lockMethodTable;
HTAB *xidTable;
pid = getpid();
ShmemPIDLookup(pid,&location);
if (location == INVALID_OFFSET)
return;
proc = (PROC *) MAKE_PTR(location);
if (proc != MyProc)
return;
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable)
return;
xidTable = lockMethodTable->xidHash;
if (MyProc->waitLock) {
LOCK_PRINT_AUX("DumpAllLocks: waiting on", MyProc->waitLock,0);
}
hash_seq(NULL);
while ((xidLook = (XIDLookupEnt *)hash_seq(xidTable)) &&
(xidLook != (XIDLookupEnt *)TRUE)) {
XID_PRINT_AUX("DumpAllLocks", xidLook);
if (xidLook->tag.lock) {
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
LOCK_PRINT_AUX("DumpAllLocks", lock, 0);
} else {
elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL");
}
if (count++ > 2000) {
elog(NOTICE,"DumpAllLocks: possible loop, giving up");
break;
}
}
}
#endif #endif
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.22 1998/08/19 02:02:44 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Attic/multi.c,v 1.23 1998/08/25 21:20:28 scrappy Exp $
* *
* NOTES: * NOTES:
* (1) The lock.c module assumes that the caller here is doing * (1) The lock.c module assumes that the caller here is doing
...@@ -29,12 +29,10 @@ ...@@ -29,12 +29,10 @@
#include "utils/rel.h" #include "utils/rel.h"
#include "miscadmin.h" /* MyDatabaseId */ #include "miscadmin.h" /* MyDatabaseId */
static bool static bool MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag,
MultiAcquire(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode, LOCKMODE lockmode, PG_LOCK_LEVEL level);
PG_LOCK_LEVEL level); static bool MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag,
static bool LOCKMODE lockmode, PG_LOCK_LEVEL level);
MultiRelease(LOCKMETHOD lockmethod, LOCKTAG *tag, LOCKMODE lockmode,
PG_LOCK_LEVEL level);
#ifdef LowLevelLocking #ifdef LowLevelLocking
...@@ -130,6 +128,7 @@ static int MultiPrios[] = { ...@@ -130,6 +128,7 @@ static int MultiPrios[] = {
* lock table is ONE lock table, not three. * lock table is ONE lock table, not three.
*/ */
LOCKMETHOD MultiTableId = (LOCKMETHOD) NULL; LOCKMETHOD MultiTableId = (LOCKMETHOD) NULL;
LOCKMETHOD LongTermTableId = (LOCKMETHOD) NULL;
#ifdef NOT_USED #ifdef NOT_USED
LOCKMETHOD ShortTermTableId = (LOCKMETHOD) NULL; LOCKMETHOD ShortTermTableId = (LOCKMETHOD) NULL;
#endif #endif
...@@ -150,12 +149,25 @@ InitMultiLevelLocks() ...@@ -150,12 +149,25 @@ InitMultiLevelLocks()
/* ----------------------- /* -----------------------
* No short term lock table for now. -Jeff 15 July 1991 * No short term lock table for now. -Jeff 15 July 1991
* *
* ShortTermTableId = LockTableRename(lockmethod); * ShortTermTableId = LockMethodTableRename(lockmethod);
* if (! (ShortTermTableId)) { * if (! (ShortTermTableId)) {
* elog(ERROR,"InitMultiLocks: couldnt rename lock table"); * elog(ERROR,"InitMultiLocks: couldnt rename lock table");
* } * }
* ----------------------- * -----------------------
*/ */
#ifdef USER_LOCKS
/*
* Allocate another tableId for long-term locks
*/
LongTermTableId = LockMethodTableRename(MultiTableId);
if (!(LongTermTableId))
{
elog(ERROR,
"InitMultiLevelLocks: couldn't rename long-term lock table");
}
#endif
return MultiTableId; return MultiTableId;
} }
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
* This is so that we can support more backends. (system-wide semaphore * This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95 * sets run out pretty fast.) -ay 4/95
* *
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.40 1998/07/27 19:38:15 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.41 1998/08/25 21:20:29 scrappy Exp $
*/ */
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
...@@ -75,10 +75,13 @@ ...@@ -75,10 +75,13 @@
#include "storage/shmem.h" #include "storage/shmem.h"
#include "storage/spin.h" #include "storage/spin.h"
#include "storage/proc.h" #include "storage/proc.h"
#include "utils/trace.h"
static void HandleDeadLock(int sig); static void HandleDeadLock(int sig);
static PROC *ProcWakeup(PROC *proc, int errType); static PROC *ProcWakeup(PROC *proc, int errType);
#define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT]
/* -------------------- /* --------------------
* Spin lock for manipulating the shared process data structure: * Spin lock for manipulating the shared process data structure:
* ProcGlobal.... Adding an extra spin lock seemed like the smallest * ProcGlobal.... Adding an extra spin lock seemed like the smallest
...@@ -247,10 +250,7 @@ InitProcess(IPCKey key) ...@@ -247,10 +250,7 @@ InitProcess(IPCKey key)
*/ */
SpinRelease(ProcStructLock); SpinRelease(ProcStructLock);
MyProc->pid = 0;
#if 0
MyProc->pid = MyProcPid; MyProc->pid = MyProcPid;
#endif
MyProc->xid = InvalidTransactionId; MyProc->xid = InvalidTransactionId;
#ifdef LowLevelLocking #ifdef LowLevelLocking
MyProc->xmin = InvalidTransactionId; MyProc->xmin = InvalidTransactionId;
...@@ -361,10 +361,13 @@ ProcKill(int exitStatus, int pid) ...@@ -361,10 +361,13 @@ ProcKill(int exitStatus, int pid)
* --------------- * ---------------
*/ */
ProcReleaseSpins(proc); ProcReleaseSpins(proc);
LockReleaseAll(1, &proc->lockQueue); LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue);
#ifdef USER_LOCKS #ifdef USER_LOCKS
LockReleaseAll(0, &proc->lockQueue); /*
* Assume we have a second lock table.
*/
LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
#endif #endif
/* ---------------- /* ----------------
...@@ -437,11 +440,12 @@ ProcQueueInit(PROC_QUEUE *queue) ...@@ -437,11 +440,12 @@ ProcQueueInit(PROC_QUEUE *queue)
* NOTES: The process queue is now a priority queue for locking. * NOTES: The process queue is now a priority queue for locking.
*/ */
int int
ProcSleep(PROC_QUEUE *waitQueue, ProcSleep(PROC_QUEUE *waitQueue, /* lock->waitProcs */
SPINLOCK spinlock, SPINLOCK spinlock,
int token, int token, /* lockmode */
int prio, int prio,
LOCK *lock) LOCK *lock,
TransactionId xid) /* needed by user locks, see below */
{ {
int i; int i;
PROC *proc; PROC *proc;
...@@ -470,7 +474,6 @@ ProcSleep(PROC_QUEUE *waitQueue, ...@@ -470,7 +474,6 @@ ProcSleep(PROC_QUEUE *waitQueue,
proc = (PROC *) MAKE_PTR(waitQueue->links.prev); proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
/* If we are a reader, and they are writers, skip past them */ /* If we are a reader, and they are writers, skip past them */
for (i = 0; i < waitQueue->size && proc->prio > prio; i++) for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
proc = (PROC *) MAKE_PTR(proc->links.prev); proc = (PROC *) MAKE_PTR(proc->links.prev);
...@@ -482,12 +485,22 @@ ProcSleep(PROC_QUEUE *waitQueue, ...@@ -482,12 +485,22 @@ ProcSleep(PROC_QUEUE *waitQueue,
MyProc->token = token; MyProc->token = token;
MyProc->waitLock = lock; MyProc->waitLock = lock;
#ifdef USER_LOCKS
/* -------------------
* Currently, we only need this for the ProcWakeup routines.
* This must be 0 for user lock, so we can't just use the value
* from GetCurrentTransactionId().
* -------------------
*/
TransactionIdStore(xid, &MyProc->xid);
#else
#ifndef LowLevelLocking #ifndef LowLevelLocking
/* ------------------- /* -------------------
* currently, we only need this for the ProcWakeup routines * currently, we only need this for the ProcWakeup routines
* ------------------- * -------------------
*/ */
TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid); TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
#endif
#endif #endif
/* ------------------- /* -------------------
...@@ -510,7 +523,8 @@ ProcSleep(PROC_QUEUE *waitQueue, ...@@ -510,7 +523,8 @@ ProcSleep(PROC_QUEUE *waitQueue,
* -------------- * --------------
*/ */
MemSet(&timeval, 0, sizeof(struct itimerval)); MemSet(&timeval, 0, sizeof(struct itimerval));
timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER; timeval.it_value.tv_sec = \
(DeadlockCheckTimer ? DeadlockCheckTimer : DEADLOCK_CHECK_TIMER);
do do
{ {
...@@ -525,7 +539,8 @@ ProcSleep(PROC_QUEUE *waitQueue, ...@@ -525,7 +539,8 @@ ProcSleep(PROC_QUEUE *waitQueue,
* the semaphore implementation. * the semaphore implementation.
* -------------- * --------------
*/ */
IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock); IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum,
IpcExclusiveLock);
} while (MyProc->errType == STATUS_NOT_FOUND); /* sleep after deadlock } while (MyProc->errType == STATUS_NOT_FOUND); /* sleep after deadlock
* check */ * check */
...@@ -534,8 +549,6 @@ ProcSleep(PROC_QUEUE *waitQueue, ...@@ -534,8 +549,6 @@ ProcSleep(PROC_QUEUE *waitQueue,
* --------------- * ---------------
*/ */
timeval.it_value.tv_sec = 0; timeval.it_value.tv_sec = 0;
if (setitimer(ITIMER_REAL, &timeval, &dummy)) if (setitimer(ITIMER_REAL, &timeval, &dummy))
elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup"); elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup");
...@@ -546,6 +559,11 @@ ProcSleep(PROC_QUEUE *waitQueue, ...@@ -546,6 +559,11 @@ ProcSleep(PROC_QUEUE *waitQueue,
*/ */
SpinAcquire(spinlock); SpinAcquire(spinlock);
#ifdef LOCK_MGR_DEBUG
/* Just to get meaningful debug messages from DumpLocks() */
MyProc->waitLock = (LOCK *)NULL;
#endif
return (MyProc->errType); return (MyProc->errType);
} }
...@@ -589,17 +607,39 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) ...@@ -589,17 +607,39 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
{ {
PROC *proc; PROC *proc;
int count; int count;
int trace_flag;
int last_locktype = -1;
int queue_size = queue->size;
Assert(queue->size >= 0);
if (!queue->size) if (!queue->size)
return (STATUS_NOT_FOUND); return (STATUS_NOT_FOUND);
proc = (PROC *) MAKE_PTR(queue->links.prev); proc = (PROC *) MAKE_PTR(queue->links.prev);
count = 0; count = 0;
while ((LockResolveConflicts(lockmethod, while ((queue_size--) && (proc))
{
/*
* This proc will conflict as the previous one did, don't even try.
*/
if (proc->token == last_locktype)
{
continue;
}
/*
* This proc conflicts with locks held by others, ignored.
*/
if (LockResolveConflicts(lockmethod,
lock, lock,
proc->token, proc->token,
proc->xid) == STATUS_OK)) proc->xid,
{ (XIDLookupEnt *) NULL) != STATUS_OK)
{
last_locktype = proc->token;
continue;
}
/* /*
* there was a waiting process, grant it the lock before waking it * there was a waiting process, grant it the lock before waking it
...@@ -608,24 +648,34 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) ...@@ -608,24 +648,34 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
* time that the awoken process begins executing again. * time that the awoken process begins executing again.
*/ */
GrantLock(lock, proc->token); GrantLock(lock, proc->token);
queue->size--;
/* /*
* ProcWakeup removes proc from the lock waiting process queue and * ProcWakeup removes proc from the lock waiting process queue and
* returns the next proc in chain. * returns the next proc in chain.
*/ */
proc = ProcWakeup(proc, NO_ERROR);
count++; count++;
if (!proc || queue->size == 0) queue->size--;
break; proc = ProcWakeup(proc, NO_ERROR);
} }
Assert(queue->size >= 0);
if (count) if (count)
return (STATUS_OK); return (STATUS_OK);
else else {
/* Something is still blocking us. May have deadlocked. */ /* Something is still blocking us. May have deadlocked. */
trace_flag = (lock->tag.lockmethod == USER_LOCKMETHOD) ? \
TRACE_USERLOCKS : TRACE_LOCKS;
TPRINTF(trace_flag,
"ProcLockWakeup: lock(%x) can't wake up any process",
MAKE_OFFSET(lock));
#ifdef DEADLOCK_DEBUG
if (pg_options[trace_flag] >= 2)
DumpAllLocks();
#endif
return (STATUS_NOT_FOUND); return (STATUS_NOT_FOUND);
}
} }
void void
...@@ -685,7 +735,7 @@ HandleDeadLock(int sig) ...@@ -685,7 +735,7 @@ HandleDeadLock(int sig)
} }
#ifdef DEADLOCK_DEBUG #ifdef DEADLOCK_DEBUG
DumpLocks(); DumpAllLocks();
#endif #endif
if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true)) if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
...@@ -711,7 +761,8 @@ HandleDeadLock(int sig) ...@@ -711,7 +761,8 @@ HandleDeadLock(int sig)
* I was awoken by a signal, not by someone unlocking my semaphore. * I was awoken by a signal, not by someone unlocking my semaphore.
* ------------------ * ------------------
*/ */
IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock); IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum,
IpcExclusiveLock);
/* ------------- /* -------------
* Set MyProc->errType to STATUS_ERROR so that we abort after * Set MyProc->errType to STATUS_ERROR so that we abort after
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: lock.h,v 1.16 1998/08/01 15:26:37 vadim Exp $ * $Id: lock.h,v 1.17 1998/08/25 21:20:31 scrappy Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
#include <storage/shmem.h> #include <storage/shmem.h>
#include <storage/itemptr.h> #include <storage/itemptr.h>
#include <storage/sinvaladt.h>
#include <utils/array.h>
extern SPINLOCK LockMgrLock; extern SPINLOCK LockMgrLock;
typedef int MASK; typedef int MASK;
...@@ -32,7 +34,7 @@ typedef int MASK; ...@@ -32,7 +34,7 @@ typedef int MASK;
* NLOCKENTS - The maximum number of lock entries in the lock table. * NLOCKENTS - The maximum number of lock entries in the lock table.
* ---------------------- * ----------------------
*/ */
#define NBACKENDS 50 #define NBACKENDS MaxBackendId
#define NLOCKS_PER_XACT 40 #define NLOCKS_PER_XACT 40
#define NLOCKENTS NLOCKS_PER_XACT*NBACKENDS #define NLOCKENTS NLOCKS_PER_XACT*NBACKENDS
...@@ -51,9 +53,14 @@ typedef int LOCKMETHOD; ...@@ -51,9 +53,14 @@ typedef int LOCKMETHOD;
* CreateSpinLocks() or the number of shared memory locations allocated * CreateSpinLocks() or the number of shared memory locations allocated
* for lock table spin locks in the case of machines with TAS instructions. * for lock table spin locks in the case of machines with TAS instructions.
*/ */
#define MAX_LOCK_METHODS 2 #define MAX_LOCK_METHODS 3
#define INVALID_TABLEID 0 #define INVALID_TABLEID 0
#define INVALID_LOCKMETHOD INVALID_TABLEID
#define DEFAULT_LOCKMETHOD 1
#define USER_LOCKMETHOD 2
#define MIN_LOCKMETHOD DEFAULT_LOCKMETHOD
/*typedef struct LOCK LOCK; */ /*typedef struct LOCK LOCK; */
...@@ -63,9 +70,11 @@ typedef struct LTAG ...@@ -63,9 +70,11 @@ typedef struct LTAG
Oid relId; Oid relId;
Oid dbId; Oid dbId;
ItemPointerData tupleId; ItemPointerData tupleId;
uint16 lockmethod; /* needed by user locks */
} LOCKTAG; } LOCKTAG;
#define TAGSIZE (sizeof(LOCKTAG)) #define TAGSIZE (sizeof(LOCKTAG))
#define LOCKTAG_LOCKMETHOD(locktag) ((locktag).lockmethod)
/* This is the control structure for a lock table. It /* This is the control structure for a lock table. It
* lives in shared memory: * lives in shared memory:
...@@ -143,8 +152,18 @@ typedef struct XIDTAG ...@@ -143,8 +152,18 @@ typedef struct XIDTAG
SHMEM_OFFSET lock; SHMEM_OFFSET lock;
int pid; int pid;
TransactionId xid; TransactionId xid;
#ifdef USE_XIDTAG_LOCKMETHOD
uint16 lockmethod; /* for debug or consistency checking */
#endif
} XIDTAG; } XIDTAG;
#ifdef USE_XIDTAG_LOCKMETHOD
#define XIDTAG_LOCKMETHOD(xidtag) ((xidtag).lockmethod)
#else
#define XIDTAG_LOCKMETHOD(xidtag) \
(((LOCK*) MAKE_PTR((xidtag).lock))->tag.lockmethod)
#endif
typedef struct XIDLookupEnt typedef struct XIDLookupEnt
{ {
/* tag */ /* tag */
...@@ -157,6 +176,7 @@ typedef struct XIDLookupEnt ...@@ -157,6 +176,7 @@ typedef struct XIDLookupEnt
} XIDLookupEnt; } XIDLookupEnt;
#define XID_TAGSIZE (sizeof(XIDTAG)) #define XID_TAGSIZE (sizeof(XIDTAG))
#define XIDENT_LOCKMETHOD(xident) (XIDTAG_LOCKMETHOD((xident).tag))
/* originally in procq.h */ /* originally in procq.h */
typedef struct PROC_QUEUE typedef struct PROC_QUEUE
...@@ -191,14 +211,16 @@ typedef struct LOCK ...@@ -191,14 +211,16 @@ typedef struct LOCK
int nActive; int nActive;
} LOCK; } LOCK;
#define LockGetLock_nHolders(l) l->nHolders #define LOCK_LOCKMETHOD(lock) (LOCKTAG_LOCKMETHOD((lock).tag))
#define LockGetLock_nHolders(l) l->nHolders
#ifdef NOT_USED
#define LockDecrWaitHolders(lock, lockmode) \ #define LockDecrWaitHolders(lock, lockmode) \
( \ ( \
lock->nHolding--, \ lock->nHolding--, \
lock->holders[lockmode]-- \ lock->holders[lockmode]-- \
) )
#endif
#define LockLockTable() SpinAcquire(LockMgrLock); #define LockLockTable() SpinAcquire(LockMgrLock);
#define UnlockLockTable() SpinRelease(LockMgrLock); #define UnlockLockTable() SpinRelease(LockMgrLock);
...@@ -209,23 +231,27 @@ extern SPINLOCK LockMgrLock; ...@@ -209,23 +231,27 @@ extern SPINLOCK LockMgrLock;
*/ */
extern void InitLocks(void); extern void InitLocks(void);
extern void LockDisable(int status); extern void LockDisable(int status);
extern LOCKMETHOD extern LOCKMETHOD LockMethodTableInit(char *tabName, MASK *conflictsP,
LockMethodTableInit(char *tabName, MASK *conflictsP, int *prioP, int *prioP, int numModes);
int numModes); extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod);
extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode); extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
extern int LOCKMODE lockmode);
LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode, extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCK *lock,
TransactionId xid); LOCKMODE lockmode, TransactionId xid,
extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode); XIDLookupEnt *xidentP);
extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
LOCKMODE lockmode);
extern void GrantLock(LOCK *lock, LOCKMODE lockmode); extern void GrantLock(LOCK *lock, LOCKMODE lockmode);
extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue); extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue);
extern int LockShmemSize(void); extern int LockShmemSize(void);
extern bool LockingDisabled(void); extern bool LockingDisabled(void);
extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check); extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock,
bool skip_check);
ArrayType* LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag);
#ifdef DEADLOCK_DEBUG #ifdef DEADLOCK_DEBUG
extern void DumpLocks(void); extern void DumpLocks(void);
extern void DumpAllLocks(void);
#endif #endif
#endif /* LOCK_H */ #endif /* LOCK_H */
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: proc.h,v 1.13 1998/07/27 19:38:38 vadim Exp $ * $Id: proc.h,v 1.14 1998/08/25 21:20:32 scrappy Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -105,10 +105,10 @@ extern bool ProcRemove(int pid); ...@@ -105,10 +105,10 @@ extern bool ProcRemove(int pid);
/* make static in storage/lmgr/proc.c -- jolly */ /* make static in storage/lmgr/proc.c -- jolly */
extern void ProcQueueInit(PROC_QUEUE *queue); extern void ProcQueueInit(PROC_QUEUE *queue);
extern int extern int ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token,
ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token, int prio, LOCK *lock, TransactionId xid);
int prio, LOCK *lock); extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock); LOCK *lock);
extern void ProcAddLock(SHM_QUEUE *elem); extern void ProcAddLock(SHM_QUEUE *elem);
extern void ProcReleaseSpins(PROC *proc); extern void ProcReleaseSpins(PROC *proc);
extern void ProcFreeAllSemaphores(void); extern void ProcFreeAllSemaphores(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment