Commit 862927f4 authored by Bruce Momjian's avatar Bruce Momjian

Real deadlock detection.

parent 0e913671
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.21 1998/01/25 05:14:02 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.22 1998/01/27 03:00:28 momjian Exp $
* *
* NOTES * NOTES
* Outside modules can create a lock table and acquire/release * Outside modules can create a lock table and acquire/release
...@@ -1201,7 +1201,7 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue) ...@@ -1201,7 +1201,7 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
done = (xidLook->queue.next == end); done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock); lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
LOCK_PRINT("ReleaseAll", (&lock->tag), 0); LOCK_PRINT("LockReleaseAll", (&lock->tag), 0);
#ifdef USER_LOCKS #ifdef USER_LOCKS
...@@ -1307,11 +1307,7 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue) ...@@ -1307,11 +1307,7 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
|| !found) || !found)
{ {
SpinRelease(masterLock); SpinRelease(masterLock);
#ifdef USER_LOCKS
elog(NOTICE, "LockReleaseAll: xid table corrupted"); elog(NOTICE, "LockReleaseAll: xid table corrupted");
#else
elog(NOTICE, "LockReplace: xid table corrupted");
#endif
return (FALSE); return (FALSE);
} }
...@@ -1329,11 +1325,7 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue) ...@@ -1329,11 +1325,7 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
if ((!lock) || (!found)) if ((!lock) || (!found))
{ {
SpinRelease(masterLock); SpinRelease(masterLock);
#ifdef USER_LOCKS
elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB"); elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
#else
elog(NOTICE, "LockReplace: cannot remove lock from HTAB");
#endif
return (FALSE); return (FALSE);
} }
} }
...@@ -1415,6 +1407,86 @@ LockingDisabled() ...@@ -1415,6 +1407,86 @@ LockingDisabled()
return LockingIsDisabled; return LockingIsDisabled;
} }
/*
* DeadlockCheck -- Checks for deadlocks for a given process
*
* We can't block on user locks, so no sense testing for deadlock
* because there is no blocking, and no timer for the block.
*
* This code takes a list of locks a process holds, and the lock that
* the process is sleeping on, and tries to find if any of the processes
* waiting on its locks hold the lock it is waiting for.
*
* We have already locked the master lock before being called.
*/
bool
DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
{
int done;
XIDLookupEnt *xidLook = NULL;
XIDLookupEnt *tmp = NULL;
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
LOCK *lock;
if (SHMQueueEmpty(lockQueue))
return false;
SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
XID_PRINT("DeadLockCheck", xidLook);
for (;;)
{
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
LOCK_PRINT("DeadLockCheck", (&lock->tag), 0);
/*
* This is our only check to see if we found the lock we want.
*
* The lock we are waiting for is already in MyProc->lockQueue
* so we need to skip it here. We are trying to find it in
* someone else's lockQueue.
*/
if (lock == findlock && !skip_check)
return true;
else if (lock != findlock || !skip_check)
{
PROC_QUEUE *waitQueue = &(lock->waitProcs);
PROC *proc;
int i;
proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
for (i = 0; i < waitQueue->size; i++)
{
/* prevent endless loops */
if (proc != MyProc && skip_check)
{
/* If we found a deadlock, we can stop right now */
if (DeadLockCheck(&(proc->lockQueue), findlock, false))
return true;
}
proc = (PROC *) MAKE_PTR(proc->links.prev);
}
}
if (done)
break;
SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
xidLook = tmp;
}
/* if we got here, no deadlock */
return false;
}
#ifdef DEADLOCK_DEBUG #ifdef DEADLOCK_DEBUG
/* /*
* Dump all locks. Must have already acquired the masterLock. * Dump all locks. Must have already acquired the masterLock.
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.28 1998/01/25 05:14:09 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.29 1998/01/27 03:00:29 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
* This is so that we can support more backends. (system-wide semaphore * This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95 * sets run out pretty fast.) -ay 4/95
* *
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.28 1998/01/25 05:14:09 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.29 1998/01/27 03:00:29 momjian Exp $
*/ */
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
...@@ -442,7 +442,7 @@ ProcQueueInit(PROC_QUEUE *queue) ...@@ -442,7 +442,7 @@ ProcQueueInit(PROC_QUEUE *queue)
* NOTES: The process queue is now a priority queue for locking. * NOTES: The process queue is now a priority queue for locking.
*/ */
int int
ProcSleep(PROC_QUEUE *queue, ProcSleep(PROC_QUEUE *waitQueue,
SPINLOCK spinlock, SPINLOCK spinlock,
int token, int token,
int prio, int prio,
...@@ -453,8 +453,8 @@ ProcSleep(PROC_QUEUE *queue, ...@@ -453,8 +453,8 @@ ProcSleep(PROC_QUEUE *queue,
struct itimerval timeval, struct itimerval timeval,
dummy; dummy;
proc = (PROC *) MAKE_PTR(queue->links.prev); proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
for (i = 0; i < queue->size; i++) for (i = 0; i < waitQueue->size; i++)
{ {
if (proc->prio >= prio) if (proc->prio >= prio)
proc = (PROC *) MAKE_PTR(proc->links.prev); proc = (PROC *) MAKE_PTR(proc->links.prev);
...@@ -478,36 +478,38 @@ ProcSleep(PROC_QUEUE *queue, ...@@ -478,36 +478,38 @@ ProcSleep(PROC_QUEUE *queue,
* ------------------- * -------------------
*/ */
SHMQueueInsertTL(&(proc->links), &(MyProc->links)); SHMQueueInsertTL(&(proc->links), &(MyProc->links));
queue->size++; waitQueue->size++;
SpinRelease(spinlock); SpinRelease(spinlock);
/* -------------- /* --------------
* Postgres does not have any deadlock detection code and for this * We set this so we can wake up periodically and check for a deadlock.
* reason we must set a timer to wake up the process in the event of * If a deadlock is detected, the handler releases the processes
* a deadlock. For now the timer is set for 1 minute and we assume that * semaphore and aborts the current transaction.
* any process which sleeps for this amount of time is deadlocked and will
* receive a SIGALRM signal. The handler should release the processes
* semaphore and abort the current transaction.
* *
* Need to zero out struct to set the interval and the micro seconds fields * Need to zero out struct to set the interval and the micro seconds fields
* to 0. * to 0.
* -------------- * --------------
*/ */
MemSet(&timeval, 0, sizeof(struct itimerval)); MemSet(&timeval, 0, sizeof(struct itimerval));
timeval.it_value.tv_sec = DEADLOCK_TIMEOUT; timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER;
if (setitimer(ITIMER_REAL, &timeval, &dummy)) do
elog(FATAL, "ProcSleep: Unable to set timer for process wakeup"); {
MyProc->errType = NO_ERROR; /* reset flag after deadlock check */
/* -------------- if (setitimer(ITIMER_REAL, &timeval, &dummy))
* if someone wakes us between SpinRelease and IpcSemaphoreLock, elog(FATAL, "ProcSleep: Unable to set timer for process wakeup");
* IpcSemaphoreLock will not block. The wakeup is "saved" by
* the semaphore implementation.
* --------------
*/
IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
/* --------------
* if someone wakes us between SpinRelease and IpcSemaphoreLock,
* IpcSemaphoreLock will not block. The wakeup is "saved" by
* the semaphore implementation.
* --------------
*/
IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
} while (MyProc->errType == STATUS_NOT_FOUND); /* sleep after deadlock check */
/* --------------- /* ---------------
* We were awoken before a timeout - now disable the timer * We were awoken before a timeout - now disable the timer
* --------------- * ---------------
...@@ -615,10 +617,9 @@ ProcAddLock(SHM_QUEUE *elem) ...@@ -615,10 +617,9 @@ ProcAddLock(SHM_QUEUE *elem)
} }
/* -------------------- /* --------------------
* We only get to this routine if we got SIGALRM after DEADLOCK_TIMEOUT * We only get to this routine if we got SIGALRM after DEADLOCK_CHECK_TIMER
* while waiting for a lock to be released by some other process. After * while waiting for a lock to be released by some other process. If we have
* the one minute deadline we assume we have a deadlock and must abort * a real deadlock, we must also indicate that I'm no longer waiting
* this transaction. We must also indicate that I'm no longer waiting
* on a lock so that other processes don't try to wake me up and screw * on a lock so that other processes don't try to wake me up and screw
* up my semaphore. * up my semaphore.
* -------------------- * --------------------
...@@ -665,12 +666,19 @@ HandleDeadLock(int sig) ...@@ -665,12 +666,19 @@ HandleDeadLock(int sig)
return; return;
} }
mywaitlock = MyProc->waitLock;
#ifdef DEADLOCK_DEBUG #ifdef DEADLOCK_DEBUG
DumpLocks(); DumpLocks();
#endif #endif
if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
{
UnlockLockTable();
MyProc->errType = STATUS_NOT_FOUND;
return;
}
mywaitlock = MyProc->waitLock;
/* ------------------------ /* ------------------------
* Get this process off the lock's wait queue * Get this process off the lock's wait queue
* ------------------------ * ------------------------
...@@ -701,8 +709,7 @@ HandleDeadLock(int sig) ...@@ -701,8 +709,7 @@ HandleDeadLock(int sig)
*/ */
UnlockLockTable(); UnlockLockTable();
elog(NOTICE, "Timeout interval reached -- possible deadlock."); elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause.");
elog(NOTICE, "See the lock(l) manual page for a possible cause.");
return; return;
} }
......
...@@ -210,14 +210,16 @@ extern void srandom(int seed); ...@@ -210,14 +210,16 @@ extern void srandom(int seed);
code seems broken without it, Bruce Momjian */ code seems broken without it, Bruce Momjian */
/* #define LOARRAY */ /* #define LOARRAY */
/* This is the time, in seconds, at which a given backend server /*
* will wait on a lock before deciding to abort the transaction * As soon as the backend blocks on a lock, it waits this number of seconds
* (this is what we do in lieu of deadlock detection). * before checking for a deadlock. If not, it keeps checking every this
* * number of seconds.
* Low numbers are not recommended as they will tend to cause * We don't check for deadlocks just before sleeping because a deadlock is
* false aborts if many transactions are long-lived. * a rare event, and checking is an expensive operation.
* We only detect deadlocks between two processes, not three or more, but
* these are the most common.
*/ */
#define DEADLOCK_TIMEOUT 60 #define DEADLOCK_CHECK_TIMER 60
/* /*
* This flag enables the use of idexes in plans generated for function * This flag enables the use of idexes in plans generated for function
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: lock.h,v 1.9 1998/01/24 22:50:11 momjian Exp $ * $Id: lock.h,v 1.10 1998/01/27 03:00:43 momjian Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -214,7 +214,7 @@ extern void GrantLock(LOCK *lock, LOCKT lockt); ...@@ -214,7 +214,7 @@ extern void GrantLock(LOCK *lock, LOCKT lockt);
extern bool LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue); extern bool LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue);
extern int LockShmemSize(void); extern int LockShmemSize(void);
extern bool LockingDisabled(void); extern bool LockingDisabled(void);
extern bool DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check);
#ifdef DEADLOCK_DEBUG #ifdef DEADLOCK_DEBUG
extern void DumpLocks(void); extern void DumpLocks(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment