Commit 564842a6 authored by Bruce Momjian's avatar Bruce Momjian

Hi, Bruce!

These are my last changes to lmgr fixing deadlock handling.
Please apply them to cvs...

Vadim
parent 9bbc1657
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.51 1999/05/10 00:45:43 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.52 1999/05/13 15:55:44 momjian Exp $
* *
* NOTES * NOTES
* Outside modules can create a lock table and acquire/release * Outside modules can create a lock table and acquire/release
...@@ -83,9 +83,9 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode); ...@@ -83,9 +83,9 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
#define LOCK_PRINT_AUX(where,lock,type) \ #define LOCK_PRINT_AUX(where,lock,type) \
TPRINTF(TRACE_ALL, \ TPRINTF(TRACE_ALL, \
"%s: lock(%x) tbl(%d) rel(%u) db(%d) obj(%u) mask(%x) " \ "%s: lock(%x) tbl(%d) rel(%u) db(%u) obj(%u) mask(%x) " \
"hold(%d,%d,%d,%d,%d)=%d " \ "hold(%d,%d,%d,%d,%d,%d,%d)=%d " \
"act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \ "act(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
where, \ where, \
MAKE_OFFSET(lock), \ MAKE_OFFSET(lock), \
lock->tag.lockmethod, \ lock->tag.lockmethod, \
...@@ -98,12 +98,16 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode); ...@@ -98,12 +98,16 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
lock->holders[3], \ lock->holders[3], \
lock->holders[4], \ lock->holders[4], \
lock->holders[5], \ lock->holders[5], \
lock->holders[6], \
lock->holders[7], \
lock->nHolding, \ lock->nHolding, \
lock->activeHolders[1], \ lock->activeHolders[1], \
lock->activeHolders[2], \ lock->activeHolders[2], \
lock->activeHolders[3], \ lock->activeHolders[3], \
lock->activeHolders[4], \ lock->activeHolders[4], \
lock->activeHolders[5], \ lock->activeHolders[5], \
lock->activeHolders[6], \
lock->activeHolders[7], \
lock->nActive, \ lock->nActive, \
lock->waitProcs.size, \ lock->waitProcs.size, \
lock_types[type]) lock_types[type])
...@@ -119,8 +123,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode); ...@@ -119,8 +123,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
#define XID_PRINT_AUX(where,xidentP) \ #define XID_PRINT_AUX(where,xidentP) \
TPRINTF(TRACE_ALL, \ TPRINTF(TRACE_ALL, \
"%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \ "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%u) " \
"hold(%d,%d,%d,%d,%d)=%d", \ "hold(%d,%d,%d,%d,%d,%d,%d)=%d", \
where, \ where, \
MAKE_OFFSET(xidentP), \ MAKE_OFFSET(xidentP), \
xidentP->tag.lock, \ xidentP->tag.lock, \
...@@ -132,6 +136,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode); ...@@ -132,6 +136,8 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
xidentP->holders[3], \ xidentP->holders[3], \
xidentP->holders[4], \ xidentP->holders[4], \
xidentP->holders[5], \ xidentP->holders[5], \
xidentP->holders[6], \
xidentP->holders[7], \
xidentP->nHolding) xidentP->nHolding)
#else /* !LOCK_MGR_DEBUG */ #else /* !LOCK_MGR_DEBUG */
...@@ -1561,19 +1567,26 @@ LockingDisabled() ...@@ -1561,19 +1567,26 @@ LockingDisabled()
* We have already locked the master lock before being called. * We have already locked the master lock before being called.
*/ */
bool bool
DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) DeadLockCheck(void *proc, LOCK *findlock)
{ {
int done;
XIDLookupEnt *xidLook = NULL; XIDLookupEnt *xidLook = NULL;
XIDLookupEnt *tmp = NULL; XIDLookupEnt *tmp = NULL;
PROC *thisProc = (PROC*) proc,
*waitProc;
SHM_QUEUE *lockQueue = &(thisProc->lockQueue);
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue); SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
LOCK *lock; LOCK *lock;
PROC_QUEUE *waitQueue;
int i,
j;
bool first_run = (thisProc == MyProc),
done;
static PROC *checked_procs[MAXBACKENDS]; static PROC *checked_procs[MAXBACKENDS];
static int nprocs; static int nprocs;
/* initialize at start of recursion */ /* initialize at start of recursion */
if (skip_check) if (first_run)
{ {
checked_procs[0] = MyProc; checked_procs[0] = MyProc;
nprocs = 1; nprocs = 1;
...@@ -1593,74 +1606,186 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check) ...@@ -1593,74 +1606,186 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
LOCK_PRINT("DeadLockCheck", lock, 0); LOCK_PRINT("DeadLockCheck", lock, 0);
if (lock->tag.relId == 0) /* user' lock */
goto nxtl;
/* /*
* This is our only check to see if we found the lock we want. * waitLock is always in lockQueue of waiting proc,
* * if !first_run then upper caller will handle waitProcs
* The lock we are waiting for is already in MyProc->lockQueue so we * queue of waitLock.
* need to skip it here. We are trying to find it in someone
* else's lockQueue. bjm
*/ */
if (lock == findlock && !skip_check) if (thisProc->waitLock == lock && !first_run)
return true; goto nxtl;
/*
* If we found proc holding findlock and sleeping on some my
* other lock then we have to check does it block me or
* another waiters.
*/
if (lock == findlock && !first_run)
{ {
PROC_QUEUE *waitQueue = &(lock->waitProcs); LOCKMETHODCTL *lockctl =
PROC *proc; LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
int i; int lm;
int j;
proc = (PROC *) MAKE_PTR(waitQueue->links.prev); Assert(xidLook->nHolding > 0);
for (i = 0; i < waitQueue->size; i++) for (lm = 1; lm <= lockctl->numLockModes; lm++)
{ {
if (xidLook->holders[lm] > 0 &&
lockctl->conflictTab[lm] & findlock->waitMask)
return true;
}
/*
* Else - get the next lock from thisProc's lockQueue
*/
goto nxtl;
}
waitQueue = &(lock->waitProcs);
waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
for (i = 0; i < waitQueue->size; i++)
{
if (waitProc == thisProc)
{
Assert(waitProc->waitLock == lock);
Assert(waitProc == MyProc);
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
continue;
}
if (lock == findlock) /* first_run also true */
{
LOCKMETHODCTL *lockctl =
LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
/* /*
* If I hold some locks on findlock and another proc * If me blocked by his holdlock...
* waits on it holding locks too - check if we are
* waiting one another.
*/ */
if (proc != MyProc && if (lockctl->conflictTab[MyProc->token] & waitProc->holdLock)
lock == findlock && /* skip_check also true */
MyProc->holdLock)
{ {
LOCKMETHODCTL *lockctl = /* and he blocked by me -> deadlock */
LockMethodTable[DEFAULT_LOCKMETHOD]->ctl; if (lockctl->conflictTab[waitProc->token] & MyProc->holdLock)
Assert(skip_check);
if (lockctl->conflictTab[MyProc->token] & proc->holdLock &&
lockctl->conflictTab[proc->token] & MyProc->holdLock)
return true; return true;
/* we shouldn't look at lockQueue of our blockers */
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
continue;
} }
/* /*
* No sense in looking at the wait queue of the lock we * If he isn't blocked by me and we request non-conflicting
* are looking for. If lock == findlock, and I got here, * lock modes - no deadlock here because of he isn't
* skip_check must be true too. * blocked by me in any sence (explicitle or implicitly).
* Note that we don't do like test if !first_run
* (when thisProc is holder and non-waiter on lock) and so
* we call DeadLockCheck below for every waitProc in
* thisProc->lockQueue, even for waitProc-s un-blocked
* by thisProc. Should we? This could save us some time...
*/ */
if (lock != findlock) if (!(lockctl->conflictTab[waitProc->token] & MyProc->holdLock) &&
!(lockctl->conflictTab[waitProc->token] & (1 << MyProc->token)))
{ {
for (j = 0; j < nprocs; j++) waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
if (checked_procs[j] == proc) continue;
break; }
if (j >= nprocs && lock != findlock) }
{
Assert(nprocs < MAXBACKENDS);
checked_procs[nprocs++] = proc;
/*
* Look in lockQueue of this waitProc, if didn't do this before.
*/
for (j = 0; j < nprocs; j++)
{
if (checked_procs[j] == waitProc)
break;
}
if (j >= nprocs)
{
Assert(nprocs < MAXBACKENDS);
checked_procs[nprocs++] = waitProc;
if (DeadLockCheck(waitProc, findlock))
{
LOCKMETHODCTL *lockctl =
LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
int holdLock;
/*
* Ok, but is waitProc waiting for me (thisProc) ?
*/
if (thisProc->waitLock == lock)
{
Assert(first_run);
holdLock = thisProc->holdLock;
}
else /* should we cache holdLock ? */
{
int lm, tmpMask = 2;
Assert(xidLook->nHolding > 0);
for (holdLock = 0, lm = 1;
lm <= lockctl->numLockModes;
lm++, tmpMask <<= 1)
{
if (xidLook->holders[lm] > 0)
holdLock |= tmpMask;
}
Assert(holdLock != 0);
}
if (lockctl->conflictTab[waitProc->token] & holdLock)
{
/* /*
* For non-MyProc entries, we are looking only * Last attempt to avoid deadlock - try to wakeup
* waiters, not necessarily people who already * myself.
* hold locks and are waiting. Now we check for
* cases where we have two or more tables in a
* deadlock. We do this by continuing to search
* for someone holding a lock bjm
*/ */
if (DeadLockCheck(&(proc->lockQueue), findlock, false)) if (first_run)
return true; {
if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
MyProc->waitLock,
MyProc->token,
MyProc->xid,
NULL) == STATUS_OK)
{
GrantLock(MyProc->waitLock, MyProc->token);
(MyProc->waitLock->waitProcs.size)--;
ProcWakeup(MyProc, NO_ERROR);
return false;
}
}
return true;
}
/*
* Hell! Is he blocked by any (other) holder ?
*/
if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
lock,
waitProc->token,
waitProc->xid,
NULL) != STATUS_OK)
{
/*
* Blocked by others - no deadlock...
*/
#ifdef DEADLOCK_DEBUG
LOCK_PRINT("DeadLockCheck: blocked by others",
lock, waitProc->token);
#endif
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
continue;
} }
/*
* Well - wakeup this guy! This is the case of
* implicit blocking: thisProc blocked someone who blocked
* waitProc by the fact that he (someone) is already
* waiting for lock (we do this for anti-starving).
*/
GrantLock(lock, waitProc->token);
waitQueue->size--;
waitProc = ProcWakeup(waitProc, NO_ERROR);
continue;
} }
proc = (PROC *) MAKE_PTR(proc->links.prev);
} }
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
} }
nxtl:;
if (done) if (done)
break; break;
SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue); SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.55 1999/05/13 15:55:44 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
* This is so that we can support more backends. (system-wide semaphore * This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95 * sets run out pretty fast.) -ay 4/95
* *
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.55 1999/05/13 15:55:44 momjian Exp $
*/ */
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
...@@ -78,7 +78,6 @@ ...@@ -78,7 +78,6 @@
#include "utils/trace.h" #include "utils/trace.h"
static void HandleDeadLock(int sig); static void HandleDeadLock(int sig);
static PROC *ProcWakeup(PROC *proc, int errType);
static void ProcFreeAllSemaphores(void); static void ProcFreeAllSemaphores(void);
#define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT] #define DeadlockCheckTimer pg_options[OPT_DEADLOCKTIMEOUT]
...@@ -640,7 +639,7 @@ rt:; ...@@ -640,7 +639,7 @@ rt:;
* remove the process from the wait queue and set its links invalid. * remove the process from the wait queue and set its links invalid.
* RETURN: the next process in the wait queue. * RETURN: the next process in the wait queue.
*/ */
static PROC * PROC *
ProcWakeup(PROC *proc, int errType) ProcWakeup(PROC *proc, int errType)
{ {
PROC *retProc; PROC *retProc;
...@@ -806,10 +805,10 @@ HandleDeadLock(int sig) ...@@ -806,10 +805,10 @@ HandleDeadLock(int sig)
DumpAllLocks(); DumpAllLocks();
#endif #endif
if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true)) MyProc->errType = STATUS_NOT_FOUND;
if (!DeadLockCheck(MyProc, MyProc->waitLock))
{ {
UnlockLockTable(); UnlockLockTable();
MyProc->errType = STATUS_NOT_FOUND;
return; return;
} }
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: lock.h,v 1.25 1999/05/07 01:23:07 vadim Exp $ * $Id: lock.h,v 1.26 1999/05/13 15:55:44 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -249,8 +249,7 @@ extern void GrantLock(LOCK *lock, LOCKMODE lockmode); ...@@ -249,8 +249,7 @@ extern void GrantLock(LOCK *lock, LOCKMODE lockmode);
extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue); extern bool LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue);
extern int LockShmemSize(int maxBackends); extern int LockShmemSize(int maxBackends);
extern bool LockingDisabled(void); extern bool LockingDisabled(void);
extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, extern bool DeadLockCheck(void *proc, LOCK *findlock);
bool skip_check);
#ifdef DEADLOCK_DEBUG #ifdef DEADLOCK_DEBUG
extern void DumpLocks(void); extern void DumpLocks(void);
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: proc.h,v 1.21 1999/05/07 01:23:07 vadim Exp $ * $Id: proc.h,v 1.22 1999/05/13 15:55:45 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -119,6 +119,7 @@ extern bool ProcRemove(int pid); ...@@ -119,6 +119,7 @@ extern bool ProcRemove(int pid);
extern void ProcQueueInit(PROC_QUEUE *queue); extern void ProcQueueInit(PROC_QUEUE *queue);
extern int ProcSleep(PROC_QUEUE *queue, LOCKMETHODCTL *lockctl, int token, extern int ProcSleep(PROC_QUEUE *queue, LOCKMETHODCTL *lockctl, int token,
LOCK *lock); LOCK *lock);
extern PROC *ProcWakeup(PROC *proc, int errType);
extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
LOCK *lock); LOCK *lock);
extern void ProcAddLock(SHM_QUEUE *elem); extern void ProcAddLock(SHM_QUEUE *elem);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment