Commit 76355439 authored by Tom Lane's avatar Tom Lane

Fix code so that we recover cleanly if there are no free semaphores

available in freeSemMap.  As noted by Tatsuo, this is now a likely
scenario for detecting MaxBackends-exceeded; if MaxBackends is a multiple
of PROC_NSEMS_PER_SET then we will fail here and not in sinval.c.  The
cleanup path did not work correctly before, anyway.
parent 29c22eec
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.105 2001/09/04 02:26:57 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.106 2001/09/04 21:42:17 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -186,6 +186,17 @@ InitProcess(void) ...@@ -186,6 +186,17 @@ InitProcess(void)
unsigned long location, unsigned long location,
myOffset; myOffset;
/*
* ProcStructLock protects the freelist of PROC entries and the map
* of free semaphores. Note that when we acquire it here, we do not
* have a PROC entry and so the ownership of the spinlock is not
* recorded anywhere; even if it was, until we register ProcKill as
* an on_shmem_exit callback, there is no exit hook that will cause
* owned spinlocks to be released. Upshot: during the first part of
* this routine, be careful to release the lock manually before any
* elog(), else you'll have a stuck spinlock to add to your woes.
*/
SpinAcquire(ProcStructLock); SpinAcquire(ProcStructLock);
/* attach to the ProcGlobal structure */ /* attach to the ProcGlobal structure */
...@@ -194,13 +205,14 @@ InitProcess(void) ...@@ -194,13 +205,14 @@ InitProcess(void)
if (!found) if (!found)
{ {
/* this should not happen. InitProcGlobal() is called before this. */ /* this should not happen. InitProcGlobal() is called before this. */
SpinRelease(ProcStructLock);
elog(STOP, "InitProcess: Proc Header uninitialized"); elog(STOP, "InitProcess: Proc Header uninitialized");
} }
if (MyProc != NULL) if (MyProc != NULL)
{ {
SpinRelease(ProcStructLock); SpinRelease(ProcStructLock);
elog(ERROR, "ProcInit: you already exist"); elog(ERROR, "InitProcess: you already exist");
} }
/* try to get a proc struct from the free list first */ /* try to get a proc struct from the free list first */
...@@ -214,14 +226,12 @@ InitProcess(void) ...@@ -214,14 +226,12 @@ InitProcess(void)
} }
else else
{ {
/* /*
* have to allocate one. We can't use the normal shmem index * have to allocate one. We can't use the normal shmem index
* table mechanism because the proc structure is stored by PID * table mechanism because the proc structure is stored by PID
* instead of by a global name (need to look it up by PID when we * instead of by a global name (need to look it up by PID when we
* cleanup dead processes). * cleanup dead processes).
*/ */
MyProc = (PROC *) ShmemAlloc(sizeof(PROC)); MyProc = (PROC *) ShmemAlloc(sizeof(PROC));
if (!MyProc) if (!MyProc)
{ {
...@@ -231,44 +241,32 @@ InitProcess(void) ...@@ -231,44 +241,32 @@ InitProcess(void)
} }
/* /*
* zero out the spin lock counts and set the sLocks field for * Initialize all fields of MyProc.
* ProcStructLock to 1 as we have acquired this spinlock above but
* didn't record it since we didn't have MyProc until now.
*/
MemSet(MyProc->sLocks, 0, sizeof(MyProc->sLocks));
MyProc->sLocks[ProcStructLock] = 1;
/*
* Set up a wait-semaphore for the proc.
*/ */
if (IsUnderPostmaster)
{
ProcGetNewSemIdAndNum(&MyProc->sem.semId, &MyProc->sem.semNum);
/*
* we might be reusing a semaphore that belongs to a dead backend.
* So be careful and reinitialize its value here.
*/
ZeroProcSemaphore(MyProc);
}
else
{
MyProc->sem.semId = -1;
MyProc->sem.semNum = -1;
}
SHMQueueElemInit(&(MyProc->links)); SHMQueueElemInit(&(MyProc->links));
MyProc->sem.semId = -1; /* no wait-semaphore acquired yet */
MyProc->sem.semNum = -1;
MyProc->errType = STATUS_OK; MyProc->errType = STATUS_OK;
MyProc->pid = MyProcPid;
MyProc->databaseId = MyDatabaseId;
MyProc->xid = InvalidTransactionId; MyProc->xid = InvalidTransactionId;
MyProc->xmin = InvalidTransactionId; MyProc->xmin = InvalidTransactionId;
MyProc->logRec.xrecoff = 0;
MyProc->waitLock = NULL; MyProc->waitLock = NULL;
MyProc->waitHolder = NULL; MyProc->waitHolder = NULL;
MyProc->pid = MyProcPid;
MyProc->databaseId = MyDatabaseId;
SHMQueueInit(&(MyProc->procHolders)); SHMQueueInit(&(MyProc->procHolders));
/*
* Zero out the spin lock counts and set the sLocks field for
* ProcStructLock to 1 as we have acquired this spinlock above but
* didn't record it since we didn't have MyProc until now.
*/
MemSet(MyProc->sLocks, 0, sizeof(MyProc->sLocks));
MyProc->sLocks[ProcStructLock] = 1;
/* /*
* Release the lock. * Release the lock while accessing shmem index; we still haven't
* installed ProcKill and so we don't want to hold lock if there's
* an error.
*/ */
SpinRelease(ProcStructLock); SpinRelease(ProcStructLock);
...@@ -283,10 +281,28 @@ InitProcess(void) ...@@ -283,10 +281,28 @@ InitProcess(void)
elog(STOP, "InitProcess: ShmemPID table broken"); elog(STOP, "InitProcess: ShmemPID table broken");
/* /*
* Arrange to clean up at backend exit. * Arrange to clean up at backend exit. Once we do this, owned
* spinlocks will be released on exit, and so we can be a lot less
* tense about errors.
*/ */
on_shmem_exit(ProcKill, 0); on_shmem_exit(ProcKill, 0);
/*
* Set up a wait-semaphore for the proc. (Do this last so that we
* can rely on ProcKill to clean up if it fails.)
*/
if (IsUnderPostmaster)
{
SpinAcquire(ProcStructLock);
ProcGetNewSemIdAndNum(&MyProc->sem.semId, &MyProc->sem.semNum);
SpinRelease(ProcStructLock);
/*
* We might be reusing a semaphore that belongs to a dead backend.
* So be careful and reinitialize its value here.
*/
ZeroProcSemaphore(MyProc);
}
/* /*
* Now that we have a PROC, we could try to acquire locks, so * Now that we have a PROC, we could try to acquire locks, so
* initialize the deadlock checker. * initialize the deadlock checker.
...@@ -425,8 +441,9 @@ ProcKill(void) ...@@ -425,8 +441,9 @@ ProcKill(void)
SpinAcquire(ProcStructLock); SpinAcquire(ProcStructLock);
/* Free up my wait semaphore */ /* Free up my wait semaphore, if I got one */
ProcFreeSem(MyProc->sem.semId, MyProc->sem.semNum); if (MyProc->sem.semId >= 0)
ProcFreeSem(MyProc->sem.semId, MyProc->sem.semNum);
/* Add PROC struct to freelist so space can be recycled in future */ /* Add PROC struct to freelist so space can be recycled in future */
MyProc->links.next = ProcGlobal->freeProcs; MyProc->links.next = ProcGlobal->freeProcs;
...@@ -993,10 +1010,7 @@ ProcGetNewSemIdAndNum(IpcSemaphoreId *semId, int *semNum) ...@@ -993,10 +1010,7 @@ ProcGetNewSemIdAndNum(IpcSemaphoreId *semId, int *semNum)
{ {
if ((freeSemMap[i] & mask) == 0) if ((freeSemMap[i] & mask) == 0)
{ {
/* A free semaphore found. Mark it as allocated. */
/*
* a free semaphore found. Mark it as allocated.
*/
freeSemMap[i] |= mask; freeSemMap[i] |= mask;
*semId = procSemIds[i]; *semId = procSemIds[i];
...@@ -1007,8 +1021,13 @@ ProcGetNewSemIdAndNum(IpcSemaphoreId *semId, int *semNum) ...@@ -1007,8 +1021,13 @@ ProcGetNewSemIdAndNum(IpcSemaphoreId *semId, int *semNum)
} }
} }
/* if we reach here, all the semaphores are in use. */ /*
elog(ERROR, "ProcGetNewSemIdAndNum: cannot allocate a free semaphore"); * If we reach here, all the semaphores are in use. This is one of the
* possible places to detect "too many backends", so give the standard
* error message. (Whether we detect it here or in sinval.c depends on
* whether MaxBackends is a multiple of PROC_NSEMS_PER_SET.)
*/
elog(FATAL, "Sorry, too many clients already");
} }
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment