lock.c 38.2 KB
Newer Older
1 2 3
/*-------------------------------------------------------------------------
 *
 * lock.c--
4
 *	  simple lock acquisition
5 6 7 8 9
 *
 * Copyright (c) 1994, Regents of the University of California
 *
 *
 * IDENTIFICATION
Bruce Momjian's avatar
Bruce Momjian committed
10
 *	  $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.22 1998/01/27 03:00:28 momjian Exp $
11 12
 *
 * NOTES
13 14 15 16 17
 *	  Outside modules can create a lock table and acquire/release
 *	  locks.  A lock table is a shared memory hash table.  When
 *	  a process tries to acquire a lock of a type that conflicts
 *	  with existing locks, it is put to sleep using the routines
 *	  in storage/lmgr/proc.c.
18
 *
19
 *	Interface:
20
 *
21
 *	LockAcquire(), LockRelease(), LockTabInit().
22
 *
23 24 25
 *	LockReplace() is called only within this module and by the
 *		lkchain module.  It releases a lock without looking
 *		the lock up in the lock table.
26
 *
27 28 29 30
 *	NOTE: This module is used to define new lock tables.  The
 *		multi-level lock table (multi.c) used by the heap
 *		access methods calls these routines.  See multi.c for
 *		examples showing how to use this interface.
31 32 33
 *
 *-------------------------------------------------------------------------
 */
34
#include <stdio.h>				/* for sprintf() */
Bruce Momjian's avatar
Bruce Momjian committed
35
#include <string.h>
36 37
#include <sys/types.h>
#include <unistd.h>
Bruce Momjian's avatar
Bruce Momjian committed
38

Marc G. Fournier's avatar
Marc G. Fournier committed
39
#include "postgres.h"
Bruce Momjian's avatar
Bruce Momjian committed
40
#include "miscadmin.h"
41 42 43 44
#include "storage/shmem.h"
#include "storage/spin.h"
#include "storage/proc.h"
#include "storage/lock.h"
Bruce Momjian's avatar
Bruce Momjian committed
45
#include "utils/dynahash.h"
46
#include "utils/hsearch.h"
Bruce Momjian's avatar
Bruce Momjian committed
47
#include "utils/memutils.h"
48 49
#include "utils/palloc.h"
#include "access/xact.h"
50
#include "access/transam.h"
51

Bruce Momjian's avatar
Bruce Momjian committed
52
static int WaitOnLock(LOCKTAB *ltable, LockTableId tableId, LOCK *lock,
53
		   LOCKT lockt);
54
		   
55 56 57 58 59 60
/*#define LOCK_MGR_DEBUG*/

#ifndef LOCK_MGR_DEBUG

#define LOCK_PRINT(where,tag,type)
#define LOCK_DUMP(where,lock,type)
61
#define LOCK_DUMP_AUX(where,lock,type)
62 63
#define XID_PRINT(where,xidentP)

64 65
#else							/* LOCK_MGR_DEBUG */

66 67 68
int			lockDebug = 0;
unsigned int lock_debug_oid_min = BootstrapObjectIdData;
static char *lock_types[] = {
69 70 71 72 73 74
	"NONE",
	"WRITE",
	"READ",
	"WRITE INTENT",
	"READ INTENT",
	"EXTEND"
75
};
76

77
#define LOCK_PRINT(where,tag,type)\
78 79 80
	if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \
		elog(DEBUG, \
			 "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \
Bruce Momjian's avatar
Bruce Momjian committed
81
			 MyProcPid,\
82 83 84 85 86
			 tag->relId, tag->dbId, \
			 ((tag->tupleId.ip_blkid.bi_hi<<16)+\
			  tag->tupleId.ip_blkid.bi_lo),\
			 tag->tupleId.ip_posid, \
			 lock_types[type])
87 88

#define LOCK_DUMP(where,lock,type)\
89 90
	if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \
		LOCK_DUMP_AUX(where,lock,type)
91 92

#define LOCK_DUMP_AUX(where,lock,type)\
93 94 95
		elog(DEBUG, \
			 "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\
			 "holders (%d,%d,%d,%d,%d) type (%s)",where, \
Bruce Momjian's avatar
Bruce Momjian committed
96
			 MyProcPid,\
97 98 99 100 101 102 103 104 105 106 107
			 lock->tag.relId, lock->tag.dbId, \
			 ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\
			  lock->tag.tupleId.ip_blkid.bi_lo),\
			 lock->tag.tupleId.ip_posid, \
			 lock->nHolding,\
			 lock->holders[1],\
			 lock->holders[2],\
			 lock->holders[3],\
			 lock->holders[4],\
			 lock->holders[5],\
			 lock_types[type])
108 109

#define XID_PRINT(where,xidentP)\
110 111 112 113 114 115 116
	if ((lockDebug >= 2) && \
		(((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
		 >= lock_debug_oid_min)) \
		elog(DEBUG,\
			 "%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\
			 "holders (%d,%d,%d,%d,%d)",\
			 where,\
Bruce Momjian's avatar
Bruce Momjian committed
117
			 MyProcPid,\
118 119 120 121 122 123 124 125 126 127 128 129
			 xidentP->tag.xid,\
			 xidentP->tag.pid,\
			 xidentP->tag.lock,\
			 xidentP->nHolding,\
			 xidentP->holders[1],\
			 xidentP->holders[2],\
			 xidentP->holders[3],\
			 xidentP->holders[4],\
			 xidentP->holders[5])

#endif							/* LOCK_MGR_DEBUG */

130
SPINLOCK	LockMgrLock;		/* in Shmem or created in
131
								 * CreateSpinlocks() */
132 133 134

/* This is to simplify/speed up some bit arithmetic */

135 136
static MASK BITS_OFF[MAX_LOCKTYPES];
static MASK BITS_ON[MAX_LOCKTYPES];
137 138 139 140 141

/* -----------------
 * XXX Want to move this to this file
 * -----------------
 */
142
static bool LockingIsDisabled;
143 144 145 146 147 148 149 150 151 152 153

/* -------------------
 * map from tableId to the lock table structure
 * -------------------
 */
static LOCKTAB *AllTables[MAX_TABLES];

/* -------------------
 * no zero-th table
 * -------------------
 */
154
static int	NumTables = 1;
155 156 157

/* -------------------
 * InitLocks -- Init the lock module.  Create a private data
158
 *		structure for constructing conflict masks.
159 160 161 162 163
 * -------------------
 */
void
InitLocks()
{
164 165
	int			i;
	int			bit;
166 167 168 169 170 171 172

	bit = 1;
	/* -------------------
	 * remember 0th locktype is invalid
	 * -------------------
	 */
	for (i = 0; i < MAX_LOCKTYPES; i++, bit <<= 1)
173
	{
174 175
		BITS_ON[i] = bit;
		BITS_OFF[i] = ~bit;
176 177 178 179 180 181 182 183 184 185
	}
}

/* -------------------
 * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
 * ------------------
 */
void
LockDisable(int status)
{
186
	LockingIsDisabled = status;
187 188 189 190 191
}


/*
 * LockTypeInit -- initialize the lock table's lock type
192
 *		structures
193 194 195 196
 *
 * Notes: just copying.  Should only be called once.
 */
static void
197 198
LockTypeInit(LOCKTAB *ltable,
			 MASK *conflictsP,
199 200
			 int *prioP,
			 int ntypes)
201
{
202
	int			i;
203 204 205 206

	ltable->ctl->nLockTypes = ntypes;
	ntypes++;
	for (i = 0; i < ntypes; i++, prioP++, conflictsP++)
207
	{
208 209
		ltable->ctl->conflictTab[i] = *conflictsP;
		ltable->ctl->prio[i] = *prioP;
210 211 212 213 214 215 216
	}
}

/*
 * LockTabInit -- initialize a lock table structure
 *
 * Notes:
217 218 219 220
 *		(a) a lock table has four separate entries in the binding
 *		table.	This is because every shared hash table and spinlock
 *		has its name stored in the binding table at its creation.  It
 *		is wasteful, in this case, but not much space is involved.
221 222 223 224
 *
 */
LockTableId
LockTabInit(char *tabName,
225
			MASK *conflictsP,
226 227
			int *prioP,
			int ntypes)
228
{
229 230 231 232 233 234
	LOCKTAB    *ltable;
	char	   *shmemName;
	HASHCTL		info;
	int			hash_flags;
	bool		found;
	int			status = TRUE;
235 236

	if (ntypes > MAX_LOCKTYPES)
237
	{
238 239 240
		elog(NOTICE, "LockTabInit: too many lock types %d greater than %d",
			 ntypes, MAX_LOCKTYPES);
		return (INVALID_TABLEID);
241
	}
242 243

	if (NumTables > MAX_TABLES)
244
	{
245 246 247 248
		elog(NOTICE,
			 "LockTabInit: system limit of MAX_TABLES (%d) lock tables",
			 MAX_TABLES);
		return (INVALID_TABLEID);
249
	}
250 251 252 253

	/* allocate a string for the binding table lookup */
	shmemName = (char *) palloc((unsigned) (strlen(tabName) + 32));
	if (!shmemName)
254
	{
255 256
		elog(NOTICE, "LockTabInit: couldn't malloc string %s \n", tabName);
		return (INVALID_TABLEID);
257
	}
258 259 260 261

	/* each lock table has a non-shared header */
	ltable = (LOCKTAB *) palloc((unsigned) sizeof(LOCKTAB));
	if (!ltable)
262
	{
263 264 265
		elog(NOTICE, "LockTabInit: couldn't malloc lock table %s\n", tabName);
		pfree(shmemName);
		return (INVALID_TABLEID);
266
	}
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284

	/* ------------------------
	 * find/acquire the spinlock for the table
	 * ------------------------
	 */
	SpinAcquire(LockMgrLock);


	/* -----------------------
	 * allocate a control structure from shared memory or attach to it
	 * if it already exists.
	 * -----------------------
	 */
	sprintf(shmemName, "%s (ctl)", tabName);
	ltable->ctl = (LOCKCTL *)
		ShmemInitStruct(shmemName, (unsigned) sizeof(LOCKCTL), &found);

	if (!ltable->ctl)
285
	{
286 287
		elog(FATAL, "LockTabInit: couldn't initialize %s", tabName);
		status = FALSE;
288
	}
289 290 291 292 293 294

	/* ----------------
	 * we're first - initialize
	 * ----------------
	 */
	if (!found)
295
	{
Bruce Momjian's avatar
Bruce Momjian committed
296
		MemSet(ltable->ctl, 0, sizeof(LOCKCTL));
297 298
		ltable->ctl->masterLock = LockMgrLock;
		ltable->ctl->tableId = NumTables;
299
	}
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325

	/* --------------------
	 * other modules refer to the lock table by a tableId
	 * --------------------
	 */
	AllTables[NumTables] = ltable;
	NumTables++;
	Assert(NumTables <= MAX_TABLES);

	/* ----------------------
	 * allocate a hash table for the lock tags.  This is used
	 * to find the different locks.
	 * ----------------------
	 */
	info.keysize = sizeof(LOCKTAG);
	info.datasize = sizeof(LOCK);
	info.hash = tag_hash;
	hash_flags = (HASH_ELEM | HASH_FUNCTION);

	sprintf(shmemName, "%s (lock hash)", tabName);
	ltable->lockHash = (HTAB *) ShmemInitHash(shmemName,
										 INIT_TABLE_SIZE, MAX_TABLE_SIZE,
											  &info, hash_flags);

	Assert(ltable->lockHash->hash == tag_hash);
	if (!ltable->lockHash)
326
	{
327 328
		elog(FATAL, "LockTabInit: couldn't initialize %s", tabName);
		status = FALSE;
329
	}
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346

	/* -------------------------
	 * allocate an xid table.  When different transactions hold
	 * the same lock, additional information must be saved (locks per tx).
	 * -------------------------
	 */
	info.keysize = XID_TAGSIZE;
	info.datasize = sizeof(XIDLookupEnt);
	info.hash = tag_hash;
	hash_flags = (HASH_ELEM | HASH_FUNCTION);

	sprintf(shmemName, "%s (xid hash)", tabName);
	ltable->xidHash = (HTAB *) ShmemInitHash(shmemName,
										 INIT_TABLE_SIZE, MAX_TABLE_SIZE,
											 &info, hash_flags);

	if (!ltable->xidHash)
347
	{
348 349
		elog(FATAL, "LockTabInit: couldn't initialize %s", tabName);
		status = FALSE;
350
	}
351 352 353 354 355 356 357 358 359 360 361 362

	/* init ctl data structures */
	LockTypeInit(ltable, conflictsP, prioP, ntypes);

	SpinRelease(LockMgrLock);

	pfree(shmemName);

	if (status)
		return (ltable->ctl->tableId);
	else
		return (INVALID_TABLEID);
363 364 365 366
}

/*
 * LockTabRename -- allocate another tableId to the same
367
 *		lock table.
368 369
 *
 * NOTES: Both the lock module and the lock chain (lchain.c)
370 371 372 373 374 375
 *		module use table id's to distinguish between different
 *		kinds of locks.  Short term and long term locks look
 *		the same to the lock table, but are handled differently
 *		by the lock chain manager.	This function allows the
 *		client to use different tableIds when acquiring/releasing
 *		short term and long term locks.
376
 */
377
#ifdef NOT_USED
378 379 380
LockTableId
LockTabRename(LockTableId tableId)
{
381
	LockTableId newTableId;
382 383

	if (NumTables >= MAX_TABLES)
384
	{
385
		return (INVALID_TABLEID);
386
	}
387
	if (AllTables[tableId] == INVALID_TABLEID)
388
	{
389
		return (INVALID_TABLEID);
390
	}
391 392 393 394 395 396 397

	/* other modules refer to the lock table by a tableId */
	newTableId = NumTables;
	NumTables++;

	AllTables[newTableId] = AllTables[tableId];
	return (newTableId);
398
}
399

400
#endif
401 402 403

/*
 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
404
 *		set lock if/when no conflicts.
405 406 407 408
 *
 * Returns: TRUE if parameters are correct, FALSE otherwise.
 *
 * Side Effects: The lock is always acquired.  No way to abort
409 410
 *		a lock acquisition other than aborting the transaction.
 *		Lock is recorded in the lkchain.
411
#ifdef USER_LOCKS
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
 * Note on User Locks:
 *		User locks are handled totally on the application side as
 *		long term cooperative locks which extend beyond the normal
 *		transaction boundaries.  Their purpose is to indicate to an
 *		application that someone is `working' on an item.  So it is
 *		possible to put an user lock on a tuple's oid, retrieve the
 *		tuple, work on it for an hour and then update it and remove
 *		the lock.  While the lock is active other clients can still
 *		read and write the tuple but they can be aware that it has
 *		been locked at the application level by someone.
 *		User locks use lock tags made of an uint16 and an uint32, for
 *		example 0 and a tuple oid, or any other arbitrary pair of
 *		numbers following a convention established by the application.
 *		In this sense tags don't refer to tuples or database entities.
 *		User locks and normal locks are completely orthogonal and
 *		they don't interfere with each other, so it is possible
 *		to acquire a normal lock on an user-locked tuple or user-lock
 *		a tuple for which a normal write lock already exists.
 *		User locks are always non blocking, therefore they are never
 *		acquired if already held by another process.  They must be
 *		released explicitly by the application but they are released
 *		automatically when a backend terminates.
 *		They are indicated by a dummy tableId 0 which doesn't have
 *		any table allocated but uses the normal lock table, and are
 *		distinguished from normal locks for the following differences:
437
 *
438
 *										normal lock		user lock
439
 *
440 441 442 443 444 445 446
 *		tableId							1				0
 *		tag.relId						rel oid			0
 *		tag.ItemPointerData.ip_blkid	block id		lock id2
 *		tag.ItemPointerData.ip_posid	tuple offset	lock id1
 *		xid.pid							0				backend pid
 *		xid.xid							current xid		0
 *		persistence						transaction		user or backend
447
 *
448 449
 *		The lockt parameter can have the same values for normal locks
 *		although probably only WRITE_LOCK can have some practical use.
450
 *
451
 *														DZ - 4 Oct 1996
452
#endif
453
 */
454

455
bool
456
LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
457
{
458 459 460 461 462 463 464 465 466
	XIDLookupEnt *result,
				item;
	HTAB	   *xidTable;
	bool		found;
	LOCK	   *lock = NULL;
	SPINLOCK	masterLock;
	LOCKTAB    *ltable;
	int			status;
	TransactionId myXid;
467

468
#ifdef USER_LOCKS
469
	int			is_user_lock;
470

471 472 473 474
	is_user_lock = (tableId == 0);
	if (is_user_lock)
	{
		tableId = 1;
475
#ifdef USER_LOCKS_DEBUG
476 477 478 479 480
		elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d",
			 lockName->tupleId.ip_posid,
			 ((lockName->tupleId.ip_blkid.bi_hi << 16) +
			  lockName->tupleId.ip_blkid.bi_lo),
			 lockt);
481
#endif
482
	}
483 484
#endif

485 486 487
	Assert(tableId < NumTables);
	ltable = AllTables[tableId];
	if (!ltable)
488
	{
489 490
		elog(NOTICE, "LockAcquire: bad lock table %d", tableId);
		return (FALSE);
491
	}
492 493

	if (LockingIsDisabled)
494
	{
495
		return (TRUE);
496
	}
497 498 499 500 501 502 503 504 505 506

	LOCK_PRINT("Acquire", lockName, lockt);
	masterLock = ltable->ctl->masterLock;

	SpinAcquire(masterLock);

	Assert(ltable->lockHash->hash == tag_hash);
	lock = (LOCK *) hash_search(ltable->lockHash, (Pointer) lockName, HASH_ENTER, &found);

	if (!lock)
507
	{
508 509 510
		SpinRelease(masterLock);
		elog(FATAL, "LockAcquire: lock table %d is corrupted", tableId);
		return (FALSE);
511
	}
512 513 514 515 516 517

	/* --------------------
	 * if there was nothing else there, complete initialization
	 * --------------------
	 */
	if (!found)
518
	{
519 520
		lock->mask = 0;
		ProcQueueInit(&(lock->waitProcs));
Bruce Momjian's avatar
Bruce Momjian committed
521 522
		MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKTYPES);
		MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKTYPES);
523 524 525 526 527 528
		lock->nHolding = 0;
		lock->nActive = 0;

		Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
							 &(lockName->tupleId.ip_blkid)));

529
	}
530 531 532 533 534 535 536 537 538 539 540 541 542 543

	/* ------------------
	 * add an element to the lock queue so that we can clear the
	 * locks at end of transaction.
	 * ------------------
	 */
	xidTable = ltable->xidHash;
	myXid = GetCurrentTransactionId();

	/* ------------------
	 * Zero out all of the tag bytes (this clears the padding bytes for long
	 * word alignment and ensures hashing consistency).
	 * ------------------
	 */
Bruce Momjian's avatar
Bruce Momjian committed
544
	MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */
545 546
	TransactionIdStore(myXid, &item.tag.xid);
	item.tag.lock = MAKE_OFFSET(lock);
547
#if 0
548
	item.tag.pid = MyPid;
549
#endif
550

551
#ifdef USER_LOCKS
552 553
	if (is_user_lock)
	{
Bruce Momjian's avatar
Bruce Momjian committed
554
		item.tag.pid = MyProcPid;
555
		item.tag.xid = myXid = 0;
556
#ifdef USER_LOCKS_DEBUG
557 558
		elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]",
			 item.tag.lock, item.tag.pid, item.tag.xid);
559
#endif
560
	}
561 562
#endif

563
	result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found);
564
	if (!result)
565
	{
566 567
		elog(NOTICE, "LockAcquire: xid table corrupted");
		return (STATUS_ERROR);
568
	}
569
	if (!found)
570
	{
571 572 573
		XID_PRINT("LockAcquire: queueing XidEnt", result);
		ProcAddLock(&result->queue);
		result->nHolding = 0;
Bruce Momjian's avatar
Bruce Momjian committed
574
		MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKTYPES);
575
	}
576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593

	/* ----------------
	 * lock->nholding tells us how many processes have _tried_ to
	 * acquire this lock,  Regardless of whether they succeeded or
	 * failed in doing so.
	 * ----------------
	 */
	lock->nHolding++;
	lock->holders[lockt]++;

	/* --------------------
	 * If I'm the only one holding a lock, then there
	 * cannot be a conflict.  Need to subtract one from the
	 * lock's count since we just bumped the count up by 1
	 * above.
	 * --------------------
	 */
	if (result->nHolding == lock->nActive)
594
	{
595 596 597 598 599
		result->holders[lockt]++;
		result->nHolding++;
		GrantLock(lock, lockt);
		SpinRelease(masterLock);
		return (TRUE);
600
	}
601 602 603 604 605 606

	Assert(result->nHolding <= lock->nActive);

	status = LockResolveConflicts(ltable, lock, lockt, myXid);

	if (status == STATUS_OK)
607
	{
608
		GrantLock(lock, lockt);
609
	}
610
	else if (status == STATUS_FOUND)
611
	{
612
#ifdef USER_LOCKS
613 614 615 616 617 618 619 620 621 622

		/*
		 * User locks are non blocking. If we can't acquire a lock remove
		 * the xid entry and return FALSE without waiting.
		 */
		if (is_user_lock)
		{
			if (!result->nHolding)
			{
				SHMQueueDelete(&result->queue);
623
				hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found);
624 625 626 627
			}
			lock->nHolding--;
			lock->holders[lockt]--;
			SpinRelease(masterLock);
628
#ifdef USER_LOCKS_DEBUG
629
			elog(NOTICE, "LockAcquire: user lock failed");
630
#endif
631 632
			return (FALSE);
		}
633
#endif
634 635
		status = WaitOnLock(ltable, tableId, lock, lockt);
		XID_PRINT("Someone granted me the lock", result);
636
	}
637 638 639 640

	SpinRelease(masterLock);

	return (status == STATUS_OK);
641 642 643 644 645 646
}

/* ----------------------------
 * LockResolveConflicts -- test for lock conflicts
 *
 * NOTES:
647
 *		Here's what makes this complicated: one transaction's
648 649 650 651 652
 * locks don't conflict with one another.  When many processes
 * hold locks, each has to subtract off the other's locks when
 * determining whether or not any new lock acquired conflicts with
 * the old ones.
 *
653 654 655 656
 *	For example, if I am already holding a WRITE_INTENT lock,
 *	there will not be a conflict with my own READ_LOCK.  If I
 *	don't consider the intent lock when checking for conflicts,
 *	I find no conflict.
657 658 659
 * ----------------------------
 */
int
660 661
LockResolveConflicts(LOCKTAB *ltable,
					 LOCK *lock,
662 663
					 LOCKT lockt,
					 TransactionId xid)
664
{
665 666 667 668 669 670 671 672 673
	XIDLookupEnt *result,
				item;
	int		   *myHolders;
	int			nLockTypes;
	HTAB	   *xidTable;
	bool		found;
	int			bitmask;
	int			i,
				tmpMask;
674 675 676 677 678 679 680 681 682 683 684 685

	nLockTypes = ltable->ctl->nLockTypes;
	xidTable = ltable->xidHash;

	/* ---------------------
	 * read my own statistics from the xid table.  If there
	 * isn't an entry, then we'll just add one.
	 *
	 * Zero out the tag, this clears the padding bytes for long
	 * word alignment and ensures hashing consistency.
	 * ------------------
	 */
Bruce Momjian's avatar
Bruce Momjian committed
686
	MemSet(&item, 0, XID_TAGSIZE);
687 688
	TransactionIdStore(xid, &item.tag.xid);
	item.tag.lock = MAKE_OFFSET(lock);
689
#if 0
690
	item.tag.pid = pid;
691
#endif
692 693

	if (!(result = (XIDLookupEnt *)
694
		  hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found)))
695
	{
696 697
		elog(NOTICE, "LockResolveConflicts: xid table corrupted");
		return (STATUS_ERROR);
698
	}
699 700 701
	myHolders = result->holders;

	if (!found)
702
	{
703 704 705 706 707
		/* ---------------
		 * we're not holding any type of lock yet.  Clear
		 * the lock stats.
		 * ---------------
		 */
Bruce Momjian's avatar
Bruce Momjian committed
708
		MemSet(result->holders, 0, nLockTypes * sizeof(*(lock->holders)));
709
		result->nHolding = 0;
710
	}
711

712 713 714 715 716 717 718 719 720 721 722 723 724 725
	{
		/* ------------------------
		 * If someone with a greater priority is waiting for the lock,
		 * do not continue and share the lock, even if we can.  bjm
		 * ------------------------
		 */
		int				myprio = ltable->ctl->prio[lockt];
		PROC_QUEUE		*waitQueue = &(lock->waitProcs);
		PROC			*topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);

		if (waitQueue->size && topproc->prio > myprio)
			return STATUS_FOUND;
	}

726 727 728 729 730 731 732 733 734 735 736
	/* ----------------------------
	 * first check for global conflicts: If no locks conflict
	 * with mine, then I get the lock.
	 *
	 * Checking for conflict: lock->mask represents the types of
	 * currently held locks.  conflictTable[lockt] has a bit
	 * set for each type of lock that conflicts with mine.	Bitwise
	 * compare tells if there is a conflict.
	 * ----------------------------
	 */
	if (!(ltable->ctl->conflictTab[lockt] & lock->mask))
737
	{
738 739 740 741 742 743 744

		result->holders[lockt]++;
		result->nHolding++;

		XID_PRINT("Conflict Resolved: updated xid entry stats", result);

		return (STATUS_OK);
745
	}
746 747 748 749 750 751 752 753 754 755

	/* ------------------------
	 * Rats.  Something conflicts. But it could still be my own
	 * lock.  We have to construct a conflict mask
	 * that does not reflect our own locks.
	 * ------------------------
	 */
	bitmask = 0;
	tmpMask = 2;
	for (i = 1; i <= nLockTypes; i++, tmpMask <<= 1)
756
	{
757
		if (lock->activeHolders[i] - myHolders[i])
758
		{
759
			bitmask |= tmpMask;
760 761
		}
	}
762 763 764 765 766 767 768 769 770

	/* ------------------------
	 * now check again for conflicts.  'bitmask' describes the types
	 * of locks held by other processes.  If one of these
	 * conflicts with the kind of lock that I want, there is a
	 * conflict and I have to sleep.
	 * ------------------------
	 */
	if (!(ltable->ctl->conflictTab[lockt] & bitmask))
771
	{
772 773 774 775 776 777 778 779 780 781

		/* no conflict. Get the lock and go on */

		result->holders[lockt]++;
		result->nHolding++;

		XID_PRINT("Conflict Resolved: updated xid entry stats", result);

		return (STATUS_OK);

782
	}
783 784

	return (STATUS_FOUND);
785 786
}

787
static int
788
WaitOnLock(LOCKTAB *ltable, LockTableId tableId, LOCK *lock, LOCKT lockt)
789
{
790
	PROC_QUEUE *waitQueue = &(lock->waitProcs);
791

792
	int			prio = ltable->ctl->prio[lockt];
793 794 795 796 797 798 799 800 801 802 803 804 805 806 807

	/*
	 * the waitqueue is ordered by priority. I insert myself according to
	 * the priority of the lock I am acquiring.
	 *
	 * SYNC NOTE: I am assuming that the lock table spinlock is sufficient
	 * synchronization for this queue.	That will not be true if/when
	 * people can be deleted from the queue by a SIGINT or something.
	 */
	LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockt);
	if (ProcSleep(waitQueue,
				  ltable->ctl->masterLock,
				  lockt,
				  prio,
				  lock) != NO_ERROR)
808
	{
809 810 811 812 813 814 815 816 817 818
		/* -------------------
		 * This could have happend as a result of a deadlock, see HandleDeadLock()
		 * Decrement the lock nHolding and holders fields as we are no longer
		 * waiting on this lock.
		 * -------------------
		 */
		lock->nHolding--;
		lock->holders[lockt]--;
		LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockt);
		SpinRelease(ltable->ctl->masterLock);
819
		elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
820
	}
821 822 823

	LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockt);
	return (STATUS_OK);
824 825 826 827
}

/*
 * LockRelease -- look up 'lockName' in lock table 'tableId' and
828
 *		release it.
829 830
 *
 * Side Effects: if the lock no longer conflicts with the highest
831 832 833 834
 *		priority waiting process, that process is granted the lock
 *		and awoken. (We have to grant the lock here to avoid a
 *		race between the waking process and any new process to
 *		come along and request the lock).
835 836
 */
bool
837
LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
838
{
839 840 841 842 843 844 845 846
	LOCK	   *lock = NULL;
	SPINLOCK	masterLock;
	bool		found;
	LOCKTAB    *ltable;
	XIDLookupEnt *result,
				item;
	HTAB	   *xidTable;
	bool		wakeupNeeded = true;
847

848
#ifdef USER_LOCKS
849
	int			is_user_lock;
850

851 852 853 854
	is_user_lock = (tableId == 0);
	if (is_user_lock)
	{
		tableId = 1;
855
#ifdef USER_LOCKS_DEBUG
856 857 858 859 860
		elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d",
			 lockName->tupleId.ip_posid,
			 ((lockName->tupleId.ip_blkid.bi_hi << 16) +
			  lockName->tupleId.ip_blkid.bi_lo),
			 lockt);
861
#endif
862
	}
863 864
#endif

865 866 867 868 869 870 871 872 873
	Assert(tableId < NumTables);
	ltable = AllTables[tableId];
	if (!ltable)
	{
		elog(NOTICE, "ltable is null in LockRelease");
		return (FALSE);
	}

	if (LockingIsDisabled)
874
	{
875
		return (TRUE);
876
	}
877 878 879 880 881 882 883 884 885 886 887 888

	LOCK_PRINT("Release", lockName, lockt);

	masterLock = ltable->ctl->masterLock;
	xidTable = ltable->xidHash;

	SpinAcquire(masterLock);

	Assert(ltable->lockHash->hash == tag_hash);
	lock = (LOCK *)
		hash_search(ltable->lockHash, (Pointer) lockName, HASH_FIND_SAVE, &found);

889
#ifdef USER_LOCKS
890 891 892 893 894 895 896 897 898 899 900

	/*
	 * If the entry is not found hash_search returns TRUE instead of NULL,
	 * so we must check it explicitly.
	 */
	if ((is_user_lock) && (lock == (LOCK *) TRUE))
	{
		SpinRelease(masterLock);
		elog(NOTICE, "LockRelease: there are no locks with this tag");
		return (FALSE);
	}
901 902
#endif

903
	/*
904
	 * let the caller print its own error message, too. Do not elog(ERROR).
905 906
	 */
	if (!lock)
907
	{
908 909 910
		SpinRelease(masterLock);
		elog(NOTICE, "LockRelease: locktable corrupted");
		return (FALSE);
911
	}
912 913

	if (!found)
914
	{
915 916 917
		SpinRelease(masterLock);
		elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
		return (FALSE);
918
	}
919 920 921

	Assert(lock->nHolding > 0);

922
#ifdef USER_LOCKS
923 924 925 926 927 928 929

	/*
	 * If this is an user lock it can be removed only after checking that
	 * it was acquired by the current process, so this code is skipped and
	 * executed later.
	 */
	if (!is_user_lock)
930
	{
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952
#endif

		/*
		 * fix the general lock stats
		 */
		lock->nHolding--;
		lock->holders[lockt]--;
		lock->nActive--;
		lock->activeHolders[lockt]--;

		Assert(lock->nActive >= 0);

		if (!lock->nHolding)
		{
			/* ------------------
			 * if there's no one waiting in the queue,
			 * we just released the last lock.
			 * Delete it from the lock table.
			 * ------------------
			 */
			Assert(ltable->lockHash->hash == tag_hash);
			lock = (LOCK *) hash_search(ltable->lockHash,
953
										(Pointer) &(lock->tag),
954 955 956 957 958
										HASH_REMOVE_SAVED,
										&found);
			Assert(lock && found);
			wakeupNeeded = false;
		}
959
#ifdef USER_LOCKS
960
	}
961
#endif
962 963 964 965 966 967

	/* ------------------
	 * Zero out all of the tag bytes (this clears the padding bytes for long
	 * word alignment and ensures hashing consistency).
	 * ------------------
	 */
Bruce Momjian's avatar
Bruce Momjian committed
968
	MemSet(&item, 0, XID_TAGSIZE);
969 970 971

	TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
	item.tag.lock = MAKE_OFFSET(lock);
972
#if 0
973
	item.tag.pid = MyPid;
974
#endif
975

976
#ifdef USER_LOCKS
977 978
	if (is_user_lock)
	{
Bruce Momjian's avatar
Bruce Momjian committed
979
		item.tag.pid = MyProcPid;
980
		item.tag.xid = 0;
981
#ifdef USER_LOCKS_DEBUG
982 983
		elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]",
			 item.tag.lock, item.tag.pid, item.tag.xid);
984
#endif
985
	}
986 987
#endif

988
	if (!(result = (XIDLookupEnt *) hash_search(xidTable,
989
												(Pointer) &item,
990 991 992
												HASH_FIND_SAVE,
												&found))
		|| !found)
993
	{
994
		SpinRelease(masterLock);
995
#ifdef USER_LOCKS
996 997 998 999 1000 1001 1002 1003
		if ((is_user_lock) && (result))
		{
			elog(NOTICE, "LockRelease: you don't have a lock on this tag");
		}
		else
		{
			elog(NOTICE, "LockRelease: find xid, table corrupted");
		}
1004
#else
1005
		elog(NOTICE, "LockReplace: xid table corrupted");
1006
#endif
1007
		return (FALSE);
1008
	}
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023

	/*
	 * now check to see if I have any private locks.  If I do, decrement
	 * the counts associated with them.
	 */
	result->holders[lockt]--;
	result->nHolding--;

	XID_PRINT("LockRelease updated xid stats", result);

	/*
	 * If this was my last hold on this lock, delete my entry in the XID
	 * table.
	 */
	if (!result->nHolding)
1024
	{
1025
#ifdef USER_LOCKS
1026 1027 1028 1029 1030 1031 1032 1033
		if (result->queue.prev == INVALID_OFFSET)
		{
			elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
		}
		if (result->queue.next == INVALID_OFFSET)
		{
			elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
		}
1034
#endif
1035 1036 1037
		if (result->queue.next != INVALID_OFFSET)
			SHMQueueDelete(&result->queue);
		if (!(result = (XIDLookupEnt *)
1038
			  hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) ||
1039
			!found)
1040
		{
1041
			SpinRelease(masterLock);
1042
#ifdef USER_LOCKS
1043
			elog(NOTICE, "LockRelease: remove xid, table corrupted");
1044
#else
1045
			elog(NOTICE, "LockReplace: xid table corrupted");
1046
#endif
1047
			return (FALSE);
1048 1049
		}
	}
1050

1051
#ifdef USER_LOCKS
1052 1053 1054 1055 1056 1057

	/*
	 * If this is an user lock remove it now, after the corresponding xid
	 * entry has been found and deleted.
	 */
	if (is_user_lock)
1058
	{
1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079

		/*
		 * fix the general lock stats
		 */
		lock->nHolding--;
		lock->holders[lockt]--;
		lock->nActive--;
		lock->activeHolders[lockt]--;

		Assert(lock->nActive >= 0);

		if (!lock->nHolding)
		{
			/* ------------------
			 * if there's no one waiting in the queue,
			 * we just released the last lock.
			 * Delete it from the lock table.
			 * ------------------
			 */
			Assert(ltable->lockHash->hash == tag_hash);
			lock = (LOCK *) hash_search(ltable->lockHash,
1080
										(Pointer) &(lock->tag),
1081 1082 1083 1084 1085
										HASH_REMOVE,
										&found);
			Assert(lock && found);
			wakeupNeeded = false;
		}
1086 1087 1088
	}
#endif

1089 1090 1091 1092 1093 1094 1095
	/* --------------------------
	 * If there are still active locks of the type I just released, no one
	 * should be woken up.	Whoever is asleep will still conflict
	 * with the remaining locks.
	 * --------------------------
	 */
	if (!(lock->activeHolders[lockt]))
1096
	{
1097 1098
		/* change the conflict mask.  No more of this lock type. */
		lock->mask &= BITS_OFF[lockt];
1099
	}
1100 1101

	if (wakeupNeeded)
1102
	{
1103 1104 1105 1106 1107 1108 1109
		/* --------------------------
		 * Wake the first waiting process and grant him the lock if it
		 * doesn't conflict.  The woken process must record the lock
		 * himself.
		 * --------------------------
		 */
		ProcLockWakeup(&(lock->waitProcs), (char *) ltable, (char *) lock);
1110
	}
1111 1112 1113

	SpinRelease(masterLock);
	return (TRUE);
1114 1115 1116 1117
}

/*
 * GrantLock -- udpate the lock data structure to show
1118
 *		the new lock holder.
1119 1120
 */
void
1121
GrantLock(LOCK *lock, LOCKT lockt)
1122
{
1123 1124 1125
	lock->nActive++;
	lock->activeHolders[lockt]++;
	lock->mask |= BITS_ON[lockt];
1126 1127
}

1128 1129 1130 1131 1132
#ifdef USER_LOCKS
/*
 * LockReleaseAll -- Release all locks in a process lock queue.
 *
 * Note: This code is a little complicated by the presence in the
1133 1134 1135 1136 1137
 *		 same queue of user locks which can't be removed from the
 *		 normal lock queue at the end of a transaction. They must
 *		 however be removed when the backend exits.
 *		 A dummy tableId 0 is used to indicate that we are releasing
 *		 the user locks, from the code added to ProcKill().
1138 1139
 */
#endif
1140
bool
1141
LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
1142
{
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153
	PROC_QUEUE *waitQueue;
	int			done;
	XIDLookupEnt *xidLook = NULL;
	XIDLookupEnt *tmp = NULL;
	SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
	SPINLOCK	masterLock;
	LOCKTAB    *ltable;
	int			i,
				nLockTypes;
	LOCK	   *lock;
	bool		found;
1154

1155
#ifdef USER_LOCKS
1156 1157 1158
	int			is_user_lock_table,
				count,
				nskip;
1159

1160
	is_user_lock_table = (tableId == 0);
1161
#ifdef USER_LOCKS_DEBUG
Bruce Momjian's avatar
Bruce Momjian committed
1162
	elog(NOTICE, "LockReleaseAll: tableId=%d, pid=%d", tableId, MyProcPid);
1163
#endif
1164 1165 1166 1167
	if (is_user_lock_table)
	{
		tableId = 1;
	}
1168 1169
#endif

1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180
	Assert(tableId < NumTables);
	ltable = AllTables[tableId];
	if (!ltable)
		return (FALSE);

	nLockTypes = ltable->ctl->nLockTypes;
	masterLock = ltable->ctl->masterLock;

	if (SHMQueueEmpty(lockQueue))
		return TRUE;

1181
#ifdef USER_LOCKS
1182
	SpinAcquire(masterLock);
1183
#endif
1184
	SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1185 1186 1187

	XID_PRINT("LockReleaseAll", xidLook);

1188
#ifndef USER_LOCKS
1189
	SpinAcquire(masterLock);
1190
#else
1191
	count = nskip = 0;
1192
#endif
1193
	for (;;)
1194
	{
1195 1196 1197 1198 1199 1200 1201 1202 1203
		/* ---------------------------
		 * XXX Here we assume the shared memory queue is circular and
		 * that we know its internal structure.  Should have some sort of
		 * macros to allow one to walk it.	mer 20 July 1991
		 * ---------------------------
		 */
		done = (xidLook->queue.next == end);
		lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);

Bruce Momjian's avatar
Bruce Momjian committed
1204
		LOCK_PRINT("LockReleaseAll", (&lock->tag), 0);
1205

1206
#ifdef USER_LOCKS
1207 1208 1209 1210 1211 1212 1213 1214 1215

		/*
		 * Sometimes the queue appears to be messed up.
		 */
		if (count++ > 2000)
		{
			elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
			nskip = 0;
			break;
1216
		}
1217 1218 1219 1220
		if (is_user_lock_table)
		{
			if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
			{
1221
#ifdef USER_LOCKS_DEBUG
1222 1223
				elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]",
				  xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1224
#endif
1225 1226 1227
				nskip++;
				goto next_item;
			}
Bruce Momjian's avatar
Bruce Momjian committed
1228
			if (xidLook->tag.pid != MyProcPid)
1229 1230
			{
				/* This should never happen */
1231
#ifdef USER_LOCKS_DEBUG
1232 1233 1234 1235 1236 1237
				elog(NOTICE,
					 "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
					 lock->tag.tupleId.ip_posid,
					 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
					  lock->tag.tupleId.ip_blkid.bi_lo),
				  xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1238
#endif
1239 1240 1241
				nskip++;
				goto next_item;
			}
1242
#ifdef USER_LOCKS_DEBUG
1243 1244 1245 1246 1247 1248
			elog(NOTICE,
				 "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
				 lock->tag.tupleId.ip_posid,
				 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
				  lock->tag.tupleId.ip_blkid.bi_lo),
				 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1249 1250
#endif
		}
1251 1252 1253 1254
		else
		{
			if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0))
			{
1255
#ifdef USER_LOCKS_DEBUG
1256 1257 1258 1259 1260 1261
				elog(NOTICE,
					 "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
					 lock->tag.tupleId.ip_posid,
					 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
					  lock->tag.tupleId.ip_blkid.bi_lo),
				  xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1262
#endif
1263 1264 1265 1266 1267 1268 1269 1270
				nskip++;
				goto next_item;
			}
#ifdef USER_LOCKS_DEBUG
			elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]",
				 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
#endif
		}
1271 1272
#endif

1273 1274 1275 1276 1277
		/* ------------------
		 * fix the general lock stats
		 * ------------------
		 */
		if (lock->nHolding != xidLook->nHolding)
1278
		{
1279 1280 1281 1282
			lock->nHolding -= xidLook->nHolding;
			lock->nActive -= xidLook->nHolding;
			Assert(lock->nActive >= 0);
			for (i = 1; i <= nLockTypes; i++)
1283
			{
1284 1285 1286 1287
				lock->holders[i] -= xidLook->holders[i];
				lock->activeHolders[i] -= xidLook->holders[i];
				if (!lock->activeHolders[i])
					lock->mask &= BITS_OFF[i];
1288 1289
			}
		}
1290
		else
1291
		{
1292 1293 1294 1295 1296 1297
			/* --------------
			 * set nHolding to zero so that we can garbage collect the lock
			 * down below...
			 * --------------
			 */
			lock->nHolding = 0;
1298
		}
1299 1300 1301 1302
		/* ----------------
		 * always remove the xidLookup entry, we're done with it now
		 * ----------------
		 */
1303
#ifdef USER_LOCKS
1304
		SHMQueueDelete(&xidLook->queue);
1305
#endif
1306 1307
		if ((!hash_search(ltable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found))
			|| !found)
1308
		{
1309 1310 1311
			SpinRelease(masterLock);
			elog(NOTICE, "LockReleaseAll: xid table corrupted");
			return (FALSE);
1312
		}
1313 1314

		if (!lock->nHolding)
1315
		{
1316 1317 1318 1319 1320 1321 1322 1323
			/* --------------------
			 * if there's no one waiting in the queue, we've just released
			 * the last lock.
			 * --------------------
			 */

			Assert(ltable->lockHash->hash == tag_hash);
			lock = (LOCK *)
1324
				hash_search(ltable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found);
1325
			if ((!lock) || (!found))
1326
			{
1327 1328 1329
				SpinRelease(masterLock);
				elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
				return (FALSE);
1330 1331
			}
		}
1332
		else
1333
		{
1334 1335 1336 1337 1338 1339 1340 1341
			/* --------------------
			 * Wake the first waiting process and grant him the lock if it
			 * doesn't conflict.  The woken process must record the lock
			 * him/herself.
			 * --------------------
			 */
			waitQueue = &(lock->waitProcs);
			ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock);
1342
		}
1343

1344
#ifdef USER_LOCKS
1345
next_item:
1346
#endif
1347 1348
		if (done)
			break;
1349
		SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1350
		xidLook = tmp;
1351
	}
1352
	SpinRelease(masterLock);
1353
#ifdef USER_LOCKS
1354 1355 1356 1357 1358

	/*
	 * Reinitialize the queue only if nothing has been left in.
	 */
	if (nskip == 0)
1359
#endif
1360 1361
		SHMQueueInit(lockQueue);
	return TRUE;
1362 1363 1364 1365 1366
}

int
LockShmemSize()
{
1367 1368 1369 1370 1371
	int			size = 0;
	int			nLockBuckets,
				nLockSegs;
	int			nXidBuckets,
				nXidSegs;
1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397

	nLockBuckets = 1 << (int) my_log2((NLOCKENTS - 1) / DEF_FFACTOR + 1);
	nLockSegs = 1 << (int) my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);

	nXidBuckets = 1 << (int) my_log2((NLOCKS_PER_XACT - 1) / DEF_FFACTOR + 1);
	nXidSegs = 1 << (int) my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);

	size += MAXALIGN(NBACKENDS * sizeof(PROC)); /* each MyProc */
	size += MAXALIGN(NBACKENDS * sizeof(LOCKCTL));		/* each ltable->ctl */
	size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */

	size += MAXALIGN(my_log2(NLOCKENTS) * sizeof(void *));
	size += MAXALIGN(sizeof(HHDR));
	size += nLockSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
	size += NLOCKENTS *			/* XXX not multiple of BUCKET_ALLOC_INCR? */
		(MAXALIGN(sizeof(BUCKET_INDEX)) +
		 MAXALIGN(sizeof(LOCK)));		/* contains hash key */

	size += MAXALIGN(my_log2(NBACKENDS) * sizeof(void *));
	size += MAXALIGN(sizeof(HHDR));
	size += nXidSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
	size += NBACKENDS *			/* XXX not multiple of BUCKET_ALLOC_INCR? */
		(MAXALIGN(sizeof(BUCKET_INDEX)) +
		 MAXALIGN(sizeof(XIDLookupEnt)));		/* contains hash key */

	return size;
1398 1399 1400 1401 1402 1403 1404 1405 1406
}

/* -----------------
 * Boolean function to determine current locking status
 * -----------------
 */
bool
LockingDisabled()
{
1407
	return LockingIsDisabled;
1408
}
1409

Bruce Momjian's avatar
Bruce Momjian committed
1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489
/*
 * DeadlockCheck -- Checks for deadlocks for a given process
 *
 * We can't block on user locks, so no sense testing for deadlock
 * because there is no blocking, and no timer for the block.
 *
 * This code takes a list of locks a process holds, and the lock that
 * the process is sleeping on, and tries to find if any of the processes
 * waiting on its locks hold the lock it is waiting for.
 *
 * We have already locked the master lock before being called.
 */
bool
DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
{
	int			done;
	XIDLookupEnt *xidLook = NULL;
	XIDLookupEnt *tmp = NULL;
	SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
	LOCK	   *lock;

	if (SHMQueueEmpty(lockQueue))
		return false;

	SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);

	XID_PRINT("DeadLockCheck", xidLook);

	for (;;)
	{
		/* ---------------------------
		 * XXX Here we assume the shared memory queue is circular and
		 * that we know its internal structure.  Should have some sort of
		 * macros to allow one to walk it.	mer 20 July 1991
		 * ---------------------------
		 */
		done = (xidLook->queue.next == end);
		lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);

		LOCK_PRINT("DeadLockCheck", (&lock->tag), 0);

		/*
		 * This is our only check to see if we found the lock we want.
		 *
		 * The lock we are waiting for is already in MyProc->lockQueue
		 * so we need to skip it here.  We are trying to find it in
		 * someone else's lockQueue.
		 */
		if (lock == findlock && !skip_check)
			return true;
		else if (lock != findlock || !skip_check)
		{
			PROC_QUEUE  *waitQueue = &(lock->waitProcs);
			PROC		*proc;
			int			i;
			
			proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
			for (i = 0; i < waitQueue->size; i++)
			{
				/* prevent endless loops */
				if (proc != MyProc && skip_check)
				{
					/* If we found a deadlock, we can stop right now */
					if (DeadLockCheck(&(proc->lockQueue), findlock, false))
						return true;
				}
				proc = (PROC *) MAKE_PTR(proc->links.prev);
			}
		}
			
		if (done)
			break;
		SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
		xidLook = tmp;
	}

	/* if we got here, no deadlock */
	return false;
}

1490 1491 1492 1493 1494 1495 1496
#ifdef DEADLOCK_DEBUG
/*
 * Dump all locks. Must have already acquired the masterLock.
 */
void
DumpLocks()
{
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
	SHMEM_OFFSET location;
	PROC	   *proc;
	SHM_QUEUE  *lockQueue;
	int			done;
	XIDLookupEnt *xidLook = NULL;
	XIDLookupEnt *tmp = NULL;
	SHMEM_OFFSET end;
	SPINLOCK	masterLock;
	int			nLockTypes;
	LOCK	   *lock;
				count;
	int			tableId = 1;
	LOCKTAB    *ltable;
1510

Bruce Momjian's avatar
Bruce Momjian committed
1511
	ShmemPIDLookup(MyProcPid, &location);
1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529
	if (location == INVALID_OFFSET)
		return;
	proc = (PROC *) MAKE_PTR(location);
	if (proc != MyProc)
		return;
	lockQueue = &proc->lockQueue;

	Assert(tableId < NumTables);
	ltable = AllTables[tableId];
	if (!ltable)
		return;

	nLockTypes = ltable->ctl->nLockTypes;
	masterLock = ltable->ctl->masterLock;

	if (SHMQueueEmpty(lockQueue))
		return;

1530
	SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556
	end = MAKE_OFFSET(lockQueue);

	LOCK_DUMP("DumpLocks", MyProc->waitLock, 0);
	XID_PRINT("DumpLocks", xidLook);

	for (count = 0;;)
	{
		/* ---------------------------
		 * XXX Here we assume the shared memory queue is circular and
		 * that we know its internal structure.  Should have some sort of
		 * macros to allow one to walk it.	mer 20 July 1991
		 * ---------------------------
		 */
		done = (xidLook->queue.next == end);
		lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);

		LOCK_DUMP("DumpLocks", lock, 0);

		if (count++ > 2000)
		{
			elog(NOTICE, "DumpLocks: xid loop detected, giving up");
			break;
		}

		if (done)
			break;
1557
		SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1558
		xidLook = tmp;
1559 1560
	}
}
1561

1562
#endif