deadlock.c 25.9 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*-------------------------------------------------------------------------
 *
 * deadlock.c
 *	  POSTGRES deadlock detection code
 *
 * See src/backend/storage/lmgr/README for a description of the deadlock
 * detection and resolution algorithms.
 *
 *
10
 * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
11 12 13 14
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
15
 *	  $PostgreSQL: pgsql/src/backend/storage/lmgr/deadlock.c,v 1.48 2007/06/19 20:13:21 tgl Exp $
16 17 18 19
 *
 *	Interface:
 *
 *	DeadLockCheck()
20 21
 *	DeadLockReport()
 *	RememberSimpleDeadLock()
22 23 24 25 26 27 28
 *	InitDeadLockChecking()
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "miscadmin.h"
29
#include "storage/lmgr.h"
30 31 32 33 34
#include "storage/proc.h"
#include "utils/memutils.h"


/* One edge in the waits-for graph */
35 36
typedef struct
{
Jan Wieck's avatar
Jan Wieck committed
37 38
	PGPROC	   *waiter;			/* the waiting process */
	PGPROC	   *blocker;		/* the process it is waiting for */
39 40
	int			pred;			/* workspace for TopoSort */
	int			link;			/* workspace for TopoSort */
41 42 43
} EDGE;

/* One potential reordering of a lock's wait queue */
44 45 46
typedef struct
{
	LOCK	   *lock;			/* the lock whose wait queue is described */
Jan Wieck's avatar
Jan Wieck committed
47
	PGPROC	  **procs;			/* array of PGPROC *'s in new wait order */
48
	int			nProcs;
49 50
} WAIT_ORDER;

51
/*
Bruce Momjian's avatar
Bruce Momjian committed
52
 * Information saved about each edge in a detected deadlock cycle.	This
53 54
 * is used to print a diagnostic message upon failure.
 *
55 56 57
 * Note: because we want to examine this info after releasing the lock
 * manager's partition locks, we can't just store LOCK and PGPROC pointers;
 * we must extract out all the info we want to be able to print.
58 59 60 61 62 63
 */
typedef struct
{
	LOCKTAG		locktag;		/* ID of awaited lock object */
	LOCKMODE	lockmode;		/* type of lock we're waiting for */
	int			pid;			/* PID of blocked backend */
64
} DEADLOCK_INFO;
65

66

Jan Wieck's avatar
Jan Wieck committed
67
static bool DeadLockCheckRecurse(PGPROC *proc);
68
static int	TestConfiguration(PGPROC *startProc);
Jan Wieck's avatar
Jan Wieck committed
69
static bool FindLockCycle(PGPROC *checkProc,
70
			  EDGE *softEdges, int *nSoftEdges);
71
static bool FindLockCycleRecurse(PGPROC *checkProc, int depth,
72
					 EDGE *softEdges, int *nSoftEdges);
73 74
static bool ExpandConstraints(EDGE *constraints, int nConstraints);
static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints,
Jan Wieck's avatar
Jan Wieck committed
75
		 PGPROC **ordering);
76

77 78 79 80 81 82 83 84 85 86
#ifdef DEBUG_DEADLOCK
static void PrintLockQueue(LOCK *lock, const char *info);
#endif


/*
 * Working space for the deadlock detector
 */

/* Workspace for FindLockCycle */
Bruce Momjian's avatar
Bruce Momjian committed
87
static PGPROC **visitedProcs;	/* Array of visited procs */
88 89
static int	nVisitedProcs;

90
/* Workspace for TopoSort */
Jan Wieck's avatar
Jan Wieck committed
91
static PGPROC **topoProcs;		/* Array of not-yet-output procs */
92 93
static int *beforeConstraints;	/* Counts of remaining before-constraints */
static int *afterConstraints;	/* List head for after-constraints */
94

95 96
/* Output area for ExpandConstraints */
static WAIT_ORDER *waitOrders;	/* Array of proposed queue rearrangements */
97
static int	nWaitOrders;
Bruce Momjian's avatar
Bruce Momjian committed
98
static PGPROC **waitOrderProcs; /* Space for waitOrders queue contents */
99

100 101
/* Current list of constraints being considered */
static EDGE *curConstraints;
102 103 104
static int	nCurConstraints;
static int	maxCurConstraints;

105 106
/* Storage space for results from FindLockCycle */
static EDGE *possibleConstraints;
107 108
static int	nPossibleConstraints;
static int	maxPossibleConstraints;
109 110
static DEADLOCK_INFO *deadlockDetails;
static int	nDeadlockDetails;
111 112 113 114 115 116


/*
 * InitDeadLockChecking -- initialize deadlock checker during backend startup
 *
 * This does per-backend initialization of the deadlock checker; primarily,
117
 * allocation of working memory for DeadLockCheck.	We do this per-backend
118 119
 * since there's no percentage in making the kernel do copy-on-write
 * inheritance of workspace from the postmaster.  We want to allocate the
120 121 122
 * space at startup because (a) the deadlock checker might be invoked when
 * there's no free memory left, and (b) the checker is normally run inside a
 * signal handler, which is a very dangerous place to invoke palloc from.
123 124 125 126
 */
void
InitDeadLockChecking(void)
{
127
	MemoryContext oldcxt;
128 129 130 131 132

	/* Make sure allocations are permanent */
	oldcxt = MemoryContextSwitchTo(TopMemoryContext);

	/*
133 134
	 * FindLockCycle needs at most MaxBackends entries in visitedProcs[] and
	 * deadlockDetails[].
135
	 */
Jan Wieck's avatar
Jan Wieck committed
136
	visitedProcs = (PGPROC **) palloc(MaxBackends * sizeof(PGPROC *));
137
	deadlockDetails = (DEADLOCK_INFO *) palloc(MaxBackends * sizeof(DEADLOCK_INFO));
138 139

	/*
140 141
	 * TopoSort needs to consider at most MaxBackends wait-queue entries, and
	 * it needn't run concurrently with FindLockCycle.
142 143 144 145 146 147 148
	 */
	topoProcs = visitedProcs;	/* re-use this space */
	beforeConstraints = (int *) palloc(MaxBackends * sizeof(int));
	afterConstraints = (int *) palloc(MaxBackends * sizeof(int));

	/*
	 * We need to consider rearranging at most MaxBackends/2 wait queues
149 150 151
	 * (since it takes at least two waiters in a queue to create a soft edge),
	 * and the expanded form of the wait queues can't involve more than
	 * MaxBackends total waiters.
152
	 */
153
	waitOrders = (WAIT_ORDER *)
154
		palloc((MaxBackends / 2) * sizeof(WAIT_ORDER));
Jan Wieck's avatar
Jan Wieck committed
155
	waitOrderProcs = (PGPROC **) palloc(MaxBackends * sizeof(PGPROC *));
156 157

	/*
158 159 160 161 162 163
	 * Allow at most MaxBackends distinct constraints in a configuration. (Is
	 * this enough?  In practice it seems it should be, but I don't quite see
	 * how to prove it.  If we run out, we might fail to find a workable wait
	 * queue rearrangement even though one exists.)  NOTE that this number
	 * limits the maximum recursion depth of DeadLockCheckRecurse. Making it
	 * really big might potentially allow a stack-overflow problem.
164 165 166 167 168 169
	 */
	maxCurConstraints = MaxBackends;
	curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE));

	/*
	 * Allow up to 3*MaxBackends constraints to be saved without having to
170 171 172 173 174
	 * re-run TestConfiguration.  (This is probably more than enough, but we
	 * can survive if we run low on space by doing excess runs of
	 * TestConfiguration to re-compute constraint lists each time needed.) The
	 * last MaxBackends entries in possibleConstraints[] are reserved as
	 * output workspace for FindLockCycle.
175 176 177 178 179 180 181 182 183 184 185 186 187
	 */
	maxPossibleConstraints = MaxBackends * 4;
	possibleConstraints =
		(EDGE *) palloc(maxPossibleConstraints * sizeof(EDGE));

	MemoryContextSwitchTo(oldcxt);
}

/*
 * DeadLockCheck -- Checks for deadlocks for a given process
 *
 * This code looks for deadlocks involving the given process.  If any
 * are found, it tries to rearrange lock wait queues to resolve the
188 189
 * deadlock.  If resolution is impossible, return DS_HARD_DEADLOCK ---
 * the caller is then expected to abort the given proc's transaction.
190
 *
191
 * Caller must already have locked all partitions of the lock tables.
192 193 194
 *
 * On failure, deadlock details are recorded in deadlockDetails[] for
 * subsequent printing by DeadLockReport().  That activity is separate
195 196
 * because (a) we don't want to do it while holding all those LWLocks,
 * and (b) we are typically invoked inside a signal handler.
197
 */
198
DeadLockState
Jan Wieck's avatar
Jan Wieck committed
199
DeadLockCheck(PGPROC *proc)
200 201 202 203 204 205 206 207 208 209 210
{
	int			i,
				j;

	/* Initialize to "no constraints" */
	nCurConstraints = 0;
	nPossibleConstraints = 0;
	nWaitOrders = 0;

	/* Search for deadlocks and possible fixes */
	if (DeadLockCheckRecurse(proc))
211 212 213 214 215
	{
		/*
		 * Call FindLockCycle one more time, to record the correct
		 * deadlockDetails[] for the basic state with no rearrangements.
		 */
Bruce Momjian's avatar
Bruce Momjian committed
216
		int			nSoftEdges;
217 218 219

		nWaitOrders = 0;
		if (!FindLockCycle(proc, possibleConstraints, &nSoftEdges))
220
			elog(FATAL, "deadlock seems to have disappeared");
221

222
		return DS_HARD_DEADLOCK;	/* cannot find a non-deadlocked state */
223
	}
224 225 226 227

	/* Apply any needed rearrangements of wait queues */
	for (i = 0; i < nWaitOrders; i++)
	{
228
		LOCK	   *lock = waitOrders[i].lock;
Jan Wieck's avatar
Jan Wieck committed
229
		PGPROC	  **procs = waitOrders[i].procs;
230
		int			nProcs = waitOrders[i].nProcs;
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
		PROC_QUEUE *waitQueue = &(lock->waitProcs);

		Assert(nProcs == waitQueue->size);

#ifdef DEBUG_DEADLOCK
		PrintLockQueue(lock, "DeadLockCheck:");
#endif

		/* Reset the queue and re-add procs in the desired order */
		ProcQueueInit(waitQueue);
		for (j = 0; j < nProcs; j++)
		{
			SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links));
			waitQueue->size++;
		}

#ifdef DEBUG_DEADLOCK
		PrintLockQueue(lock, "rearranged to:");
#endif
250 251 252

		/* See if any waiters for the lock can be woken up now */
		ProcLockWakeup(GetLocksMethodTable(lock), lock);
253
	}
254

255
	/* Return code tells caller if we had to escape a deadlock or not */
256 257 258
	if (nWaitOrders > 0)
		return DS_SOFT_DEADLOCK;
	else
259
		return DS_NO_DEADLOCK;
260 261 262 263 264 265
}

/*
 * DeadLockCheckRecurse -- recursively search for valid orderings
 *
 * curConstraints[] holds the current set of constraints being considered
266
 * by an outer level of recursion.	Add to this each possible solution
267 268
 * constraint for any cycle detected at this level.
 *
269
 * Returns TRUE if no solution exists.	Returns FALSE if a deadlock-free
270 271 272 273
 * state is attainable, in which case waitOrders[] shows the required
 * rearrangements of lock wait queues (if any).
 */
static bool
Jan Wieck's avatar
Jan Wieck committed
274
DeadLockCheckRecurse(PGPROC *proc)
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
{
	int			nEdges;
	int			oldPossibleConstraints;
	bool		savedList;
	int			i;

	nEdges = TestConfiguration(proc);
	if (nEdges < 0)
		return true;			/* hard deadlock --- no solution */
	if (nEdges == 0)
		return false;			/* good configuration found */
	if (nCurConstraints >= maxCurConstraints)
		return true;			/* out of room for active constraints? */
	oldPossibleConstraints = nPossibleConstraints;
	if (nPossibleConstraints + nEdges + MaxBackends <= maxPossibleConstraints)
	{
		/* We can save the edge list in possibleConstraints[] */
		nPossibleConstraints += nEdges;
		savedList = true;
	}
	else
	{
		/* Not room; will need to regenerate the edges on-the-fly */
		savedList = false;
	}
300

301 302 303 304 305 306 307 308 309
	/*
	 * Try each available soft edge as an addition to the configuration.
	 */
	for (i = 0; i < nEdges; i++)
	{
		if (!savedList && i > 0)
		{
			/* Regenerate the list of possible added constraints */
			if (nEdges != TestConfiguration(proc))
310
				elog(FATAL, "inconsistent results during deadlock check");
311 312
		}
		curConstraints[nCurConstraints] =
313
			possibleConstraints[oldPossibleConstraints + i];
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
		nCurConstraints++;
		if (!DeadLockCheckRecurse(proc))
			return false;		/* found a valid solution! */
		/* give up on that added constraint, try again */
		nCurConstraints--;
	}
	nPossibleConstraints = oldPossibleConstraints;
	return true;				/* no solution found */
}


/*--------------------
 * Test a configuration (current set of constraints) for validity.
 *
 * Returns:
 *		0: the configuration is good (no deadlocks)
 *	   -1: the configuration has a hard deadlock or is not self-consistent
 *		>0: the configuration has one or more soft deadlocks
 *
 * In the soft-deadlock case, one of the soft cycles is chosen arbitrarily
 * and a list of its soft edges is returned beginning at
 * possibleConstraints+nPossibleConstraints.  The return value is the
 * number of soft edges.
 *--------------------
 */
339
static int
Jan Wieck's avatar
Jan Wieck committed
340
TestConfiguration(PGPROC *startProc)
341
{
342 343 344 345
	int			softFound = 0;
	EDGE	   *softEdges = possibleConstraints + nPossibleConstraints;
	int			nSoftEdges;
	int			i;
346 347 348 349 350 351

	/*
	 * Make sure we have room for FindLockCycle's output.
	 */
	if (nPossibleConstraints + MaxBackends > maxPossibleConstraints)
		return -1;
352

353 354 355 356 357 358
	/*
	 * Expand current constraint set into wait orderings.  Fail if the
	 * constraint set is not self-consistent.
	 */
	if (!ExpandConstraints(curConstraints, nCurConstraints))
		return -1;
359

360
	/*
361 362 363
	 * Check for cycles involving startProc or any of the procs mentioned in
	 * constraints.  We check startProc last because if it has a soft cycle
	 * still to be dealt with, we want to deal with that first.
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
	 */
	for (i = 0; i < nCurConstraints; i++)
	{
		if (FindLockCycle(curConstraints[i].waiter, softEdges, &nSoftEdges))
		{
			if (nSoftEdges == 0)
				return -1;		/* hard deadlock detected */
			softFound = nSoftEdges;
		}
		if (FindLockCycle(curConstraints[i].blocker, softEdges, &nSoftEdges))
		{
			if (nSoftEdges == 0)
				return -1;		/* hard deadlock detected */
			softFound = nSoftEdges;
		}
	}
	if (FindLockCycle(startProc, softEdges, &nSoftEdges))
	{
		if (nSoftEdges == 0)
			return -1;			/* hard deadlock detected */
		softFound = nSoftEdges;
	}
	return softFound;
}


/*
 * FindLockCycle -- basic check for deadlock cycles
 *
 * Scan outward from the given proc to see if there is a cycle in the
 * waits-for graph that includes this proc.  Return TRUE if a cycle
395
 * is found, else FALSE.  If a cycle is found, we return a list of
396
 * the "soft edges", if any, included in the cycle.  These edges could
397 398 399 400
 * potentially be eliminated by rearranging wait queues.  We also fill
 * deadlockDetails[] with information about the detected cycle; this info
 * is not used by the deadlock algorithm itself, only to print a useful
 * message after failing.
401 402 403
 *
 * Since we need to be able to check hypothetical configurations that would
 * exist after wait queue rearrangement, the routine pays attention to the
404
 * table of hypothetical queue orders in waitOrders[].	These orders will
405 406 407
 * be believed in preference to the actual ordering seen in the locktable.
 */
static bool
Jan Wieck's avatar
Jan Wieck committed
408
FindLockCycle(PGPROC *checkProc,
409 410 411 412
			  EDGE *softEdges,	/* output argument */
			  int *nSoftEdges)	/* output argument */
{
	nVisitedProcs = 0;
413
	nDeadlockDetails = 0;
414
	*nSoftEdges = 0;
415
	return FindLockCycleRecurse(checkProc, 0, softEdges, nSoftEdges);
416 417 418
}

static bool
Jan Wieck's avatar
Jan Wieck committed
419
FindLockCycleRecurse(PGPROC *checkProc,
420
					 int depth,
421 422 423
					 EDGE *softEdges,	/* output argument */
					 int *nSoftEdges)	/* output argument */
{
Jan Wieck's avatar
Jan Wieck committed
424
	PGPROC	   *proc;
425
	LOCK	   *lock;
426
	PROCLOCK   *proclock;
427
	SHM_QUEUE  *procLocks;
428
	LockMethod	lockMethodTable;
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
	PROC_QUEUE *waitQueue;
	int			queue_size;
	int			conflictMask;
	int			i;
	int			numLockModes,
				lm;

	/*
	 * Have we already seen this proc?
	 */
	for (i = 0; i < nVisitedProcs; i++)
	{
		if (visitedProcs[i] == checkProc)
		{
			/* If we return to starting point, we have a deadlock cycle */
			if (i == 0)
445 446
			{
				/*
447 448
				 * record total length of cycle --- outer levels will now fill
				 * deadlockDetails[]
449 450 451 452
				 */
				Assert(depth <= MaxBackends);
				nDeadlockDetails = depth;

453
				return true;
454
			}
455

456
			/*
457 458
			 * Otherwise, we have a cycle but it does not include the start
			 * point, so say "no deadlock".
459 460 461 462 463 464 465
			 */
			return false;
		}
	}
	/* Mark proc as seen */
	Assert(nVisitedProcs < MaxBackends);
	visitedProcs[nVisitedProcs++] = checkProc;
466

467 468 469 470 471 472 473 474 475
	/*
	 * If the proc is not waiting, we have no outgoing waits-for edges.
	 */
	if (checkProc->links.next == INVALID_OFFSET)
		return false;
	lock = checkProc->waitLock;
	if (lock == NULL)
		return false;
	lockMethodTable = GetLocksMethodTable(lock);
Bruce Momjian's avatar
Bruce Momjian committed
476 477
	numLockModes = lockMethodTable->numLockModes;
	conflictMask = lockMethodTable->conflictTab[checkProc->waitLockMode];
478

479
	/*
480 481
	 * Scan for procs that already hold conflicting locks.	These are "hard"
	 * edges in the waits-for graph.
482
	 */
483
	procLocks = &(lock->procLocks);
484

485
	proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
Bruce Momjian's avatar
Bruce Momjian committed
486
										 offsetof(PROCLOCK, lockLink));
487

488
	while (proclock)
489
	{
490
		proc = proclock->tag.myProc;
491 492 493 494 495 496

		/* A proc never blocks itself */
		if (proc != checkProc)
		{
			for (lm = 1; lm <= numLockModes; lm++)
			{
497 498
				if ((proclock->holdMask & LOCKBIT_ON(lm)) &&
					(conflictMask & LOCKBIT_ON(lm)))
499 500
				{
					/* This proc hard-blocks checkProc */
Bruce Momjian's avatar
Bruce Momjian committed
501
					if (FindLockCycleRecurse(proc, depth + 1,
502 503 504
											 softEdges, nSoftEdges))
					{
						/* fill deadlockDetails[] */
Bruce Momjian's avatar
Bruce Momjian committed
505
						DEADLOCK_INFO *info = &deadlockDetails[depth];
506 507 508 509 510

						info->locktag = lock->tag;
						info->lockmode = checkProc->waitLockMode;
						info->pid = checkProc->pid;

511
						return true;
512
					}
513
					/* If no deadlock, we're done looking at this proclock */
514 515 516 517 518
					break;
				}
			}
		}

519
		proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
520
											 offsetof(PROCLOCK, lockLink));
521 522 523 524
	}

	/*
	 * Scan for procs that are ahead of this one in the lock's wait queue.
525 526 527
	 * Those that have conflicting requests soft-block this one.  This must be
	 * done after the hard-block search, since if another proc both hard- and
	 * soft-blocks this one, we want to call it a hard edge.
528
	 *
529 530
	 * If there is a proposed re-ordering of the lock's wait order, use that
	 * rather than the current wait order.
531 532 533 534 535 536 537 538 539 540
	 */
	for (i = 0; i < nWaitOrders; i++)
	{
		if (waitOrders[i].lock == lock)
			break;
	}

	if (i < nWaitOrders)
	{
		/* Use the given hypothetical wait queue order */
Jan Wieck's avatar
Jan Wieck committed
541
		PGPROC	  **procs = waitOrders[i].procs;
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556

		queue_size = waitOrders[i].nProcs;

		for (i = 0; i < queue_size; i++)
		{
			proc = procs[i];

			/* Done when we reach the target proc */
			if (proc == checkProc)
				break;

			/* Is there a conflict with this guy's request? */
			if (((1 << proc->waitLockMode) & conflictMask) != 0)
			{
				/* This proc soft-blocks checkProc */
Bruce Momjian's avatar
Bruce Momjian committed
557
				if (FindLockCycleRecurse(proc, depth + 1,
558
										 softEdges, nSoftEdges))
559
				{
560
					/* fill deadlockDetails[] */
Bruce Momjian's avatar
Bruce Momjian committed
561
					DEADLOCK_INFO *info = &deadlockDetails[depth];
562 563 564 565 566

					info->locktag = lock->tag;
					info->lockmode = checkProc->waitLockMode;
					info->pid = checkProc->pid;

567
					/*
568
					 * Add this edge to the list of soft edges in the cycle
569
					 */
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584
					Assert(*nSoftEdges < MaxBackends);
					softEdges[*nSoftEdges].waiter = checkProc;
					softEdges[*nSoftEdges].blocker = proc;
					(*nSoftEdges)++;
					return true;
				}
			}
		}
	}
	else
	{
		/* Use the true lock wait queue order */
		waitQueue = &(lock->waitProcs);
		queue_size = waitQueue->size;

Jan Wieck's avatar
Jan Wieck committed
585
		proc = (PGPROC *) MAKE_PTR(waitQueue->links.next);
586 587 588 589 590 591 592 593 594 595 596

		while (queue_size-- > 0)
		{
			/* Done when we reach the target proc */
			if (proc == checkProc)
				break;

			/* Is there a conflict with this guy's request? */
			if (((1 << proc->waitLockMode) & conflictMask) != 0)
			{
				/* This proc soft-blocks checkProc */
Bruce Momjian's avatar
Bruce Momjian committed
597
				if (FindLockCycleRecurse(proc, depth + 1,
598
										 softEdges, nSoftEdges))
599
				{
600
					/* fill deadlockDetails[] */
Bruce Momjian's avatar
Bruce Momjian committed
601
					DEADLOCK_INFO *info = &deadlockDetails[depth];
602 603 604 605 606

					info->locktag = lock->tag;
					info->lockmode = checkProc->waitLockMode;
					info->pid = checkProc->pid;

607
					/*
608
					 * Add this edge to the list of soft edges in the cycle
609
					 */
610 611 612 613 614 615 616 617
					Assert(*nSoftEdges < MaxBackends);
					softEdges[*nSoftEdges].waiter = checkProc;
					softEdges[*nSoftEdges].blocker = proc;
					(*nSoftEdges)++;
					return true;
				}
			}

Jan Wieck's avatar
Jan Wieck committed
618
			proc = (PGPROC *) MAKE_PTR(proc->links.next);
619 620 621 622 623 624 625 626 627 628 629 630 631 632 633
		}
	}

	/*
	 * No conflict detected here.
	 */
	return false;
}


/*
 * ExpandConstraints -- expand a list of constraints into a set of
 *		specific new orderings for affected wait queues
 *
 * Input is a list of soft edges to be reversed.  The output is a list
Jan Wieck's avatar
Jan Wieck committed
634
 * of nWaitOrders WAIT_ORDER structs in waitOrders[], with PGPROC array
635 636 637 638 639 640 641 642 643 644 645 646 647 648
 * workspace in waitOrderProcs[].
 *
 * Returns TRUE if able to build an ordering that satisfies all the
 * constraints, FALSE if not (there are contradictory constraints).
 */
static bool
ExpandConstraints(EDGE *constraints,
				  int nConstraints)
{
	int			nWaitOrderProcs = 0;
	int			i,
				j;

	nWaitOrders = 0;
649

650
	/*
651
	 * Scan constraint list backwards.	This is because the last-added
652 653
	 * constraint is the only one that could fail, and so we want to test it
	 * for inconsistency first.
654
	 */
655
	for (i = nConstraints; --i >= 0;)
656
	{
Jan Wieck's avatar
Jan Wieck committed
657
		PGPROC	   *proc = constraints[i].waiter;
658
		LOCK	   *lock = proc->waitLock;
659 660

		/* Did we already make a list for this lock? */
661
		for (j = nWaitOrders; --j >= 0;)
662 663 664 665 666 667 668 669 670 671 672 673
		{
			if (waitOrders[j].lock == lock)
				break;
		}
		if (j >= 0)
			continue;
		/* No, so allocate a new list */
		waitOrders[nWaitOrders].lock = lock;
		waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs;
		waitOrders[nWaitOrders].nProcs = lock->waitProcs.size;
		nWaitOrderProcs += lock->waitProcs.size;
		Assert(nWaitOrderProcs <= MaxBackends);
674

675
		/*
676 677
		 * Do the topo sort.  TopoSort need not examine constraints after this
		 * one, since they must be for different locks.
678
		 */
679
		if (!TopoSort(lock, constraints, i + 1,
680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702
					  waitOrders[nWaitOrders].procs))
			return false;
		nWaitOrders++;
	}
	return true;
}


/*
 * TopoSort -- topological sort of a wait queue
 *
 * Generate a re-ordering of a lock's wait queue that satisfies given
 * constraints about certain procs preceding others.  (Each such constraint
 * is a fact of a partial ordering.)  Minimize rearrangement of the queue
 * not needed to achieve the partial ordering.
 *
 * This is a lot simpler and slower than, for example, the topological sort
 * algorithm shown in Knuth's Volume 1.  However, Knuth's method doesn't
 * try to minimize the damage to the existing order.  In practice we are
 * not likely to be working with more than a few constraints, so the apparent
 * slowness of the algorithm won't really matter.
 *
 * The initial queue ordering is taken directly from the lock's wait queue.
Jan Wieck's avatar
Jan Wieck committed
703
 * The output is an array of PGPROC pointers, of length equal to the lock's
704
 * wait queue length (the caller is responsible for providing this space).
705
 * The partial order is specified by an array of EDGE structs.	Each EDGE
706 707 708 709 710 711 712 713 714 715 716
 * is one that we need to reverse, therefore the "waiter" must appear before
 * the "blocker" in the output array.  The EDGE array may well contain
 * edges associated with other locks; these should be ignored.
 *
 * Returns TRUE if able to build an ordering that satisfies all the
 * constraints, FALSE if not (there are contradictory constraints).
 */
static bool
TopoSort(LOCK *lock,
		 EDGE *constraints,
		 int nConstraints,
Jan Wieck's avatar
Jan Wieck committed
717
		 PGPROC **ordering)		/* output argument */
718 719 720
{
	PROC_QUEUE *waitQueue = &(lock->waitProcs);
	int			queue_size = waitQueue->size;
Jan Wieck's avatar
Jan Wieck committed
721
	PGPROC	   *proc;
722 723 724 725 726 727
	int			i,
				j,
				k,
				last;

	/* First, fill topoProcs[] array with the procs in their current order */
Jan Wieck's avatar
Jan Wieck committed
728
	proc = (PGPROC *) MAKE_PTR(waitQueue->links.next);
729 730 731
	for (i = 0; i < queue_size; i++)
	{
		topoProcs[i] = proc;
Jan Wieck's avatar
Jan Wieck committed
732
		proc = (PGPROC *) MAKE_PTR(proc->links.next);
733 734 735
	}

	/*
736 737 738 739 740 741 742 743
	 * Scan the constraints, and for each proc in the array, generate a count
	 * of the number of constraints that say it must be before something else,
	 * plus a list of the constraints that say it must be after something
	 * else. The count for the j'th proc is stored in beforeConstraints[j],
	 * and the head of its list in afterConstraints[j].  Each constraint
	 * stores its list link in constraints[i].link (note any constraint will
	 * be in just one list). The array index for the before-proc of the i'th
	 * constraint is remembered in constraints[i].pred.
744 745 746 747 748 749 750 751 752 753
	 */
	MemSet(beforeConstraints, 0, queue_size * sizeof(int));
	MemSet(afterConstraints, 0, queue_size * sizeof(int));
	for (i = 0; i < nConstraints; i++)
	{
		proc = constraints[i].waiter;
		/* Ignore constraint if not for this lock */
		if (proc->waitLock != lock)
			continue;
		/* Find the waiter proc in the array */
754
		for (j = queue_size; --j >= 0;)
755 756 757 758 759 760 761
		{
			if (topoProcs[j] == proc)
				break;
		}
		Assert(j >= 0);			/* should have found a match */
		/* Find the blocker proc in the array */
		proc = constraints[i].blocker;
762
		for (k = queue_size; --k >= 0;)
763 764 765 766 767
		{
			if (topoProcs[k] == proc)
				break;
		}
		Assert(k >= 0);			/* should have found a match */
768
		beforeConstraints[j]++; /* waiter must come before */
769 770 771
		/* add this constraint to list of after-constraints for blocker */
		constraints[i].pred = j;
		constraints[i].link = afterConstraints[k];
772
		afterConstraints[k] = i + 1;
773 774
	}
	/*--------------------
775
	 * Now scan the topoProcs array backwards.	At each step, output the
776 777 778 779 780 781 782 783 784
	 * last proc that has no remaining before-constraints, and decrease
	 * the beforeConstraints count of each of the procs it was constrained
	 * against.
	 * i = index of ordering[] entry we want to output this time
	 * j = search index for topoProcs[]
	 * k = temp for scanning constraint list for proc j
	 * last = last non-null index in topoProcs (avoid redundant searches)
	 *--------------------
	 */
785 786
	last = queue_size - 1;
	for (i = queue_size; --i >= 0;)
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802
	{
		/* Find next candidate to output */
		while (topoProcs[last] == NULL)
			last--;
		for (j = last; j >= 0; j--)
		{
			if (topoProcs[j] != NULL && beforeConstraints[j] == 0)
				break;
		}
		/* If no available candidate, topological sort fails */
		if (j < 0)
			return false;
		/* Output candidate, and mark it done by zeroing topoProcs[] entry */
		ordering[i] = topoProcs[j];
		topoProcs[j] = NULL;
		/* Update beforeConstraints counts of its predecessors */
803 804
		for (k = afterConstraints[j]; k > 0; k = constraints[k - 1].link)
			beforeConstraints[constraints[k - 1].pred]--;
805 806 807 808 809 810 811 812 813 814 815 816
	}

	/* Done */
	return true;
}

#ifdef DEBUG_DEADLOCK
static void
PrintLockQueue(LOCK *lock, const char *info)
{
	PROC_QUEUE *waitQueue = &(lock->waitProcs);
	int			queue_size = waitQueue->size;
Jan Wieck's avatar
Jan Wieck committed
817
	PGPROC	   *proc;
818 819 820
	int			i;

	printf("%s lock %lx queue ", info, MAKE_OFFSET(lock));
Jan Wieck's avatar
Jan Wieck committed
821
	proc = (PGPROC *) MAKE_PTR(waitQueue->links.next);
822 823 824
	for (i = 0; i < queue_size; i++)
	{
		printf(" %d", proc->pid);
Jan Wieck's avatar
Jan Wieck committed
825
		proc = (PGPROC *) MAKE_PTR(proc->links.next);
826 827 828 829 830
	}
	printf("\n");
	fflush(stdout);
}
#endif
831

832
/*
833
 * Report a detected deadlock, with available details.
834 835 836 837
 */
void
DeadLockReport(void)
{
Bruce Momjian's avatar
Bruce Momjian committed
838
	StringInfoData buf;
839
	StringInfoData buf2;
Bruce Momjian's avatar
Bruce Momjian committed
840
	int			i;
841

842
	initStringInfo(&buf);
843 844
	initStringInfo(&buf2);

845 846
	for (i = 0; i < nDeadlockDetails; i++)
	{
Bruce Momjian's avatar
Bruce Momjian committed
847
		DEADLOCK_INFO *info = &deadlockDetails[i];
848 849 850
		int			nextpid;

		/* The last proc waits for the first one... */
Bruce Momjian's avatar
Bruce Momjian committed
851
		if (i < nDeadlockDetails - 1)
852 853 854 855
			nextpid = info[1].pid;
		else
			nextpid = deadlockDetails[0].pid;

856 857 858
		if (i > 0)
			appendStringInfoChar(&buf, '\n');

859
		/* reset buf2 to hold next object description */
860
		resetStringInfo(&buf2);
861 862 863 864

		DescribeLockTag(&buf2, &info->locktag);

		appendStringInfo(&buf,
865
				  _("Process %d waits for %s on %s; blocked by process %d."),
866
						 info->pid,
867 868
						 GetLockmodeName(info->locktag.locktag_lockmethodid,
										 info->lockmode),
869 870
						 buf2.data,
						 nextpid);
871
	}
872 873 874 875
	ereport(ERROR,
			(errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
			 errmsg("deadlock detected"),
			 errdetail("%s", buf.data)));
876 877 878 879 880 881 882 883 884 885 886 887 888
}

/*
 * RememberSimpleDeadLock: set up info for DeadLockReport when ProcSleep
 * detects a trivial (two-way) deadlock.  proc1 wants to block for lockmode
 * on lock, but proc2 is already waiting and would be blocked by proc1.
 */
void
RememberSimpleDeadLock(PGPROC *proc1,
					   LOCKMODE lockmode,
					   LOCK *lock,
					   PGPROC *proc2)
{
Bruce Momjian's avatar
Bruce Momjian committed
889
	DEADLOCK_INFO *info = &deadlockDetails[0];
890 891 892 893 894 895 896 897 898 899

	info->locktag = lock->tag;
	info->lockmode = lockmode;
	info->pid = proc1->pid;
	info++;
	info->locktag = proc2->waitLock->tag;
	info->lockmode = proc2->waitLockMode;
	info->pid = proc2->pid;
	nDeadlockDetails = 2;
}