Commit 122abf3a authored by Vadim B. Mikheev's avatar Vadim B. Mikheev

Fix LMGR for MVCC.

Get rid of Extend lock mode.
parent 86bc1da2
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Id: hio.c,v 1.18 1999/05/01 15:04:46 vadim Exp $ * $Id: hio.c,v 1.19 1999/05/07 01:22:53 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -111,14 +111,13 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple) ...@@ -111,14 +111,13 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
Item item; Item item;
/* /*
* Actually, we lock _relation_ here, not page, but I believe * Lock relation for extention. We can use LockPage here as long as
* that locking page is faster... Obviously, we could get rid * in all other places we use page-level locking for indices only.
* of ExtendLock mode at all and use ExclusiveLock mode on * Alternatevely, we could define pseudo-table as we do for
* page 0, as long as we use page-level locking for indices only, * transactions with XactLockTable.
* but we are in 6.5-beta currently... - vadim 05/01/99
*/ */
if (!relation->rd_myxactonly) if (!relation->rd_myxactonly)
LockPage(relation, 0, ExtendLock); LockPage(relation, 0, ExclusiveLock);
/* /*
* XXX This does an lseek - VERY expensive - but at the moment it is * XXX This does an lseek - VERY expensive - but at the moment it is
...@@ -166,7 +165,7 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple) ...@@ -166,7 +165,7 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
} }
if (!relation->rd_myxactonly) if (!relation->rd_myxactonly)
UnlockPage(relation, 0, ExtendLock); UnlockPage(relation, 0, ExclusiveLock);
offnum = PageAddItem((Page) pageHeader, (Item) tuple->t_data, offnum = PageAddItem((Page) pageHeader, (Item) tuple->t_data,
tuple->t_len, InvalidOffsetNumber, LP_USED); tuple->t_len, InvalidOffsetNumber, LP_USED);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.22 1999/02/13 23:18:24 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.23 1999/05/07 01:23:02 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -79,9 +79,6 @@ static MASK LockConflicts[] = { ...@@ -79,9 +79,6 @@ static MASK LockConflicts[] = {
(1 << RowExclusiveLock) | (1 << RowShareLock) | (1 << AccessExclusiveLock) | (1 << RowExclusiveLock) | (1 << RowShareLock) | (1 << AccessExclusiveLock) |
(1 << AccessShareLock), (1 << AccessShareLock),
/* ExtendLock */
(1 << ExtendLock)
}; };
static int LockPrios[] = { static int LockPrios[] = {
...@@ -92,8 +89,7 @@ static int LockPrios[] = { ...@@ -92,8 +89,7 @@ static int LockPrios[] = {
4, 4,
5, 5,
6, 6,
7, 7
1
}; };
LOCKMETHOD LockTableId = (LOCKMETHOD) NULL; LOCKMETHOD LockTableId = (LOCKMETHOD) NULL;
......
This diff is collapsed.
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
* This is so that we can support more backends. (system-wide semaphore * This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95 * sets run out pretty fast.) -ay 4/95
* *
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
*/ */
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
...@@ -106,6 +106,8 @@ static void ProcKill(int exitStatus, int pid); ...@@ -106,6 +106,8 @@ static void ProcKill(int exitStatus, int pid);
static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum); static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum);
static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum); static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum);
static char *DeadLockMessage = "Deadlock detected -- See the lock(l) manual page for a possible cause.";
/* /*
* InitProcGlobal - * InitProcGlobal -
* initializes the global process table. We put it here so that * initializes the global process table. We put it here so that
...@@ -488,68 +490,80 @@ ProcQueueInit(PROC_QUEUE *queue) ...@@ -488,68 +490,80 @@ ProcQueueInit(PROC_QUEUE *queue)
*/ */
int int
ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
SPINLOCK spinlock, LOCKMETHODCTL *lockctl,
int token, /* lockmode */ int token, /* lockmode */
int prio, LOCK *lock)
LOCK *lock,
TransactionId xid) /* needed by user locks, see below */
{ {
int i; int i;
SPINLOCK spinlock = lockctl->masterLock;
PROC *proc; PROC *proc;
int myMask = (1 << token);
int waitMask = lock->waitMask;
int aheadHolders[MAX_LOCKMODES];
bool selfConflict = (lockctl->conflictTab[token] & myMask),
prevSame = false;
bool deadlock_checked = false; bool deadlock_checked = false;
struct itimerval timeval, struct itimerval timeval,
dummy; dummy;
/* MyProc->token = token;
* If the first entries in the waitQueue have a greater priority than MyProc->waitLock = lock;
* we have, we must be a reader, and they must be a writers, and we
* must be here because the current holder is a writer or a reader but
* we don't share shared locks if a writer is waiting. We put
* ourselves after the writers. This way, we have a FIFO, but keep
* the readers together to give them decent priority, and no one
* starves. Because we group all readers together, a non-empty queue
* only has a few possible configurations:
*
* [readers] [writers] [readers][writers] [writers][readers]
* [writers][readers][writers]
*
* In a full queue, we would have a reader holding a lock, then a writer
* gets the lock, then a bunch of readers, made up of readers who
* could not share the first readlock because a writer was waiting,
* and new readers arriving while the writer had the lock. bjm
*/
proc = (PROC *) MAKE_PTR(waitQueue->links.prev); proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
/* If we are a reader, and they are writers, skip past them */ /* if we don't conflict with any waiter - be first in queue */
for (i = 0; i < waitQueue->size && proc->prio > prio; i++) if (!(lockctl->conflictTab[token] & waitMask))
proc = (PROC *) MAKE_PTR(proc->links.prev); goto ins;
/* The rest of the queue is FIFO, with readers first, writers last */ for (i = 1; i < MAX_LOCKMODES; i++)
for (; i < waitQueue->size && proc->prio <= prio; i++) aheadHolders[i] = lock->activeHolders[i];
proc = (PROC *) MAKE_PTR(proc->links.prev); (aheadHolders[token])++;
MyProc->prio = prio; for (i = 0; i < waitQueue->size; i++)
MyProc->token = token; {
MyProc->waitLock = lock; /* am I waiting for him ? */
if (lockctl->conflictTab[token] & proc->holdLock)
{
/* is he waiting for me ? */
if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
{
MyProc->errType = STATUS_ERROR;
elog(NOTICE, DeadLockMessage);
goto rt;
}
/* being waiting for him - go past */
}
/* if he waits for me */
else if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
{
break;
}
/* if conflicting locks requested */
else if (lockctl->conflictTab[proc->token] & myMask)
{
/*
* If I request non self-conflicting lock and there
* are others requesting the same lock just before me -
* stay here.
*/
if (!selfConflict && prevSame)
break;
}
/*
* Last attempt to don't move any more: if we don't conflict
* with rest waiters in queue.
*/
else if (!(lockctl->conflictTab[token] & waitMask))
break;
#ifdef USER_LOCKS prevSame = (proc->token == token);
/* ------------------- (aheadHolders[proc->token])++;
* Currently, we only need this for the ProcWakeup routines. if (aheadHolders[proc->token] == lock->holders[proc->token])
* This must be 0 for user lock, so we can't just use the value waitMask &= ~ (1 << proc->token);
* from GetCurrentTransactionId(). proc = (PROC *) MAKE_PTR(proc->links.prev);
* ------------------- }
*/
TransactionIdStore(xid, &MyProc->xid);
#else
#ifndef LowLevelLocking
/* -------------------
* currently, we only need this for the ProcWakeup routines
* -------------------
*/
TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
#endif
#endif
ins:;
/* ------------------- /* -------------------
* assume that these two operations are atomic (because * assume that these two operations are atomic (because
* of the spinlock). * of the spinlock).
...@@ -558,6 +572,7 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ ...@@ -558,6 +572,7 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
SHMQueueInsertTL(&(proc->links), &(MyProc->links)); SHMQueueInsertTL(&(proc->links), &(MyProc->links));
waitQueue->size++; waitQueue->size++;
lock->waitMask |= myMask;
SpinRelease(spinlock); SpinRelease(spinlock);
/* -------------- /* --------------
...@@ -608,6 +623,8 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ ...@@ -608,6 +623,8 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
*/ */
SpinAcquire(spinlock); SpinAcquire(spinlock);
rt:;
#ifdef LOCK_MGR_DEBUG #ifdef LOCK_MGR_DEBUG
/* Just to get meaningful debug messages from DumpLocks() */ /* Just to get meaningful debug messages from DumpLocks() */
MyProc->waitLock = (LOCK *) NULL; MyProc->waitLock = (LOCK *) NULL;
...@@ -655,9 +672,9 @@ int ...@@ -655,9 +672,9 @@ int
ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
{ {
PROC *proc; PROC *proc;
int count; int count = 0;
int trace_flag; int trace_flag;
int last_locktype = -1; int last_locktype = 0;
int queue_size = queue->size; int queue_size = queue->size;
Assert(queue->size >= 0); Assert(queue->size >= 0);
...@@ -666,7 +683,6 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) ...@@ -666,7 +683,6 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
return STATUS_NOT_FOUND; return STATUS_NOT_FOUND;
proc = (PROC *) MAKE_PTR(queue->links.prev); proc = (PROC *) MAKE_PTR(queue->links.prev);
count = 0;
while ((queue_size--) && (proc)) while ((queue_size--) && (proc))
{ {
...@@ -678,7 +694,7 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) ...@@ -678,7 +694,7 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
continue; continue;
/* /*
* This proc conflicts with locks held by others, ignored. * Does this proc conflict with locks held by others ?
*/ */
if (LockResolveConflicts(lockmethod, if (LockResolveConflicts(lockmethod,
lock, lock,
...@@ -686,6 +702,8 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) ...@@ -686,6 +702,8 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
proc->xid, proc->xid,
(XIDLookupEnt *) NULL) != STATUS_OK) (XIDLookupEnt *) NULL) != STATUS_OK)
{ {
if (count != 0)
break;
last_locktype = proc->token; last_locktype = proc->token;
continue; continue;
} }
...@@ -828,7 +846,7 @@ HandleDeadLock(int sig) ...@@ -828,7 +846,7 @@ HandleDeadLock(int sig)
*/ */
UnlockLockTable(); UnlockLockTable();
elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause."); elog(NOTICE, DeadLockMessage);
return; return;
} }
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: lmgr.h,v 1.18 1999/02/19 06:06:34 tgl Exp $ * $Id: lmgr.h,v 1.19 1999/05/07 01:23:05 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -25,8 +25,6 @@ ...@@ -25,8 +25,6 @@
#define ExclusiveLock 6 #define ExclusiveLock 6
#define AccessExclusiveLock 7 #define AccessExclusiveLock 7
#define ExtendLock 8
extern LOCKMETHOD LockTableId; extern LOCKMETHOD LockTableId;
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: lock.h,v 1.24 1999/03/06 21:17:43 tgl Exp $ * $Id: lock.h,v 1.25 1999/05/07 01:23:07 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -41,7 +41,7 @@ typedef int LOCKMODE; ...@@ -41,7 +41,7 @@ typedef int LOCKMODE;
typedef int LOCKMETHOD; typedef int LOCKMETHOD;
/* MAX_LOCKMODES cannot be larger than the bits in MASK */ /* MAX_LOCKMODES cannot be larger than the bits in MASK */
#define MAX_LOCKMODES 9 #define MAX_LOCKMODES 8
/* /*
* MAX_LOCK_METHODS corresponds to the number of spin locks allocated in * MAX_LOCK_METHODS corresponds to the number of spin locks allocated in
...@@ -204,6 +204,7 @@ typedef struct LOCK ...@@ -204,6 +204,7 @@ typedef struct LOCK
/* data */ /* data */
int mask; int mask;
int waitMask;
PROC_QUEUE waitProcs; PROC_QUEUE waitProcs;
int holders[MAX_LOCKMODES]; int holders[MAX_LOCKMODES];
int nHolding; int nHolding;
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* *
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* $Id: proc.h,v 1.20 1999/02/19 07:10:47 tgl Exp $ * $Id: proc.h,v 1.21 1999/05/07 01:23:07 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -48,8 +48,9 @@ typedef struct proc ...@@ -48,8 +48,9 @@ typedef struct proc
* were starting our xact: vacuum must not * were starting our xact: vacuum must not
* remove tuples deleted by xid >= xmin ! */ * remove tuples deleted by xid >= xmin ! */
LOCK *waitLock; /* Lock we're sleeping on */ LOCK *waitLock; /* Lock we're sleeping on ... */
int token; /* info for proc wakeup routines */ int token; /* type of lock we sleeping for */
int holdLock; /* while holding these locks */
int pid; /* This procs process id */ int pid; /* This procs process id */
short sLocks[MAX_SPINS]; /* Spin lock stats */ short sLocks[MAX_SPINS]; /* Spin lock stats */
SHM_QUEUE lockQueue; /* locks associated with current SHM_QUEUE lockQueue; /* locks associated with current
...@@ -116,8 +117,8 @@ extern bool ProcRemove(int pid); ...@@ -116,8 +117,8 @@ extern bool ProcRemove(int pid);
/* make static in storage/lmgr/proc.c -- jolly */ /* make static in storage/lmgr/proc.c -- jolly */
extern void ProcQueueInit(PROC_QUEUE *queue); extern void ProcQueueInit(PROC_QUEUE *queue);
extern int ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token, extern int ProcSleep(PROC_QUEUE *queue, LOCKMETHODCTL *lockctl, int token,
int prio, LOCK *lock, TransactionId xid); LOCK *lock);
extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
LOCK *lock); LOCK *lock);
extern void ProcAddLock(SHM_QUEUE *elem); extern void ProcAddLock(SHM_QUEUE *elem);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment