Commit 6f2871f1 authored by Robert Haas's avatar Robert Haas

Centralize decision-making about where to get a backend's PGPROC.

This code was originally written as part of parallel query effort, but
it seems to have independent value, because if we make one decision
about where to get a PGPROC when we allocate and then put it back on a
different list at backend-exit time, bad things happen.  This isn't
just a theoretical risk; we fixed an actual problem of this type in
commit e280c630.
parent 95f4e59c
...@@ -242,18 +242,21 @@ InitProcGlobal(void) ...@@ -242,18 +242,21 @@ InitProcGlobal(void)
/* PGPROC for normal backend, add to freeProcs list */ /* PGPROC for normal backend, add to freeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
ProcGlobal->freeProcs = &procs[i]; ProcGlobal->freeProcs = &procs[i];
procs[i].procgloballist = &ProcGlobal->freeProcs;
} }
else if (i < MaxConnections + autovacuum_max_workers + 1) else if (i < MaxConnections + autovacuum_max_workers + 1)
{ {
/* PGPROC for AV launcher/worker, add to autovacFreeProcs list */ /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
ProcGlobal->autovacFreeProcs = &procs[i]; ProcGlobal->autovacFreeProcs = &procs[i];
procs[i].procgloballist = &ProcGlobal->autovacFreeProcs;
} }
else if (i < MaxBackends) else if (i < MaxBackends)
{ {
/* PGPROC for bgworker, add to bgworkerFreeProcs list */ /* PGPROC for bgworker, add to bgworkerFreeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs; procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs;
ProcGlobal->bgworkerFreeProcs = &procs[i]; ProcGlobal->bgworkerFreeProcs = &procs[i];
procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs;
} }
/* Initialize myProcLocks[] shared memory queues. */ /* Initialize myProcLocks[] shared memory queues. */
...@@ -281,6 +284,7 @@ InitProcess(void) ...@@ -281,6 +284,7 @@ InitProcess(void)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile PROC_HDR *procglobal = ProcGlobal; volatile PROC_HDR *procglobal = ProcGlobal;
PGPROC * volatile * procgloballist;
/* /*
* ProcGlobal should be set up already (if we are a backend, we inherit * ProcGlobal should be set up already (if we are a backend, we inherit
...@@ -292,9 +296,17 @@ InitProcess(void) ...@@ -292,9 +296,17 @@ InitProcess(void)
if (MyProc != NULL) if (MyProc != NULL)
elog(ERROR, "you already exist"); elog(ERROR, "you already exist");
/* Decide which list should supply our PGPROC. */
if (IsAnyAutoVacuumProcess())
procgloballist = &procglobal->autovacFreeProcs;
else if (IsBackgroundWorker)
procgloballist = &procglobal->bgworkerFreeProcs;
else
procgloballist = &procglobal->freeProcs;
/* /*
* Try to get a proc struct from the free list. If this fails, we must be * Try to get a proc struct from the appropriate free list. If this
* out of PGPROC structures (not to mention semaphores). * fails, we must be out of PGPROC structures (not to mention semaphores).
* *
* While we are holding the ProcStructLock, also copy the current shared * While we are holding the ProcStructLock, also copy the current shared
* estimate of spins_per_delay to local storage. * estimate of spins_per_delay to local storage.
...@@ -303,21 +315,11 @@ InitProcess(void) ...@@ -303,21 +315,11 @@ InitProcess(void)
set_spins_per_delay(procglobal->spins_per_delay); set_spins_per_delay(procglobal->spins_per_delay);
if (IsAnyAutoVacuumProcess()) MyProc = *procgloballist;
MyProc = procglobal->autovacFreeProcs;
else if (IsBackgroundWorker)
MyProc = procglobal->bgworkerFreeProcs;
else
MyProc = procglobal->freeProcs;
if (MyProc != NULL) if (MyProc != NULL)
{ {
if (IsAnyAutoVacuumProcess()) *procgloballist = (PGPROC *) MyProc->links.next;
procglobal->autovacFreeProcs = (PGPROC *) MyProc->links.next;
else if (IsBackgroundWorker)
procglobal->bgworkerFreeProcs = (PGPROC *) MyProc->links.next;
else
procglobal->freeProcs = (PGPROC *) MyProc->links.next;
SpinLockRelease(ProcStructLock); SpinLockRelease(ProcStructLock);
} }
else else
...@@ -335,6 +337,12 @@ InitProcess(void) ...@@ -335,6 +337,12 @@ InitProcess(void)
} }
MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno]; MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno];
/*
* Cross-check that the PGPROC is of the type we expect; if this were
* not the case, it would get returned to the wrong list.
*/
Assert(MyProc->procgloballist == procgloballist);
/* /*
* Now that we have a PGPROC, mark ourselves as an active postmaster * Now that we have a PGPROC, mark ourselves as an active postmaster
* child; this is so that the postmaster can detect it if we exit without * child; this is so that the postmaster can detect it if we exit without
...@@ -761,6 +769,7 @@ ProcKill(int code, Datum arg) ...@@ -761,6 +769,7 @@ ProcKill(int code, Datum arg)
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile PROC_HDR *procglobal = ProcGlobal; volatile PROC_HDR *procglobal = ProcGlobal;
PGPROC *proc; PGPROC *proc;
PGPROC * volatile * procgloballist;
Assert(MyProc != NULL); Assert(MyProc != NULL);
...@@ -799,24 +808,12 @@ ProcKill(int code, Datum arg) ...@@ -799,24 +808,12 @@ ProcKill(int code, Datum arg)
MyProc = NULL; MyProc = NULL;
DisownLatch(&proc->procLatch); DisownLatch(&proc->procLatch);
procgloballist = proc->procgloballist;
SpinLockAcquire(ProcStructLock); SpinLockAcquire(ProcStructLock);
/* Return PGPROC structure (and semaphore) to appropriate freelist */ /* Return PGPROC structure (and semaphore) to appropriate freelist */
if (IsAnyAutoVacuumProcess()) proc->links.next = (SHM_QUEUE *) *procgloballist;
{ *procgloballist = proc;
proc->links.next = (SHM_QUEUE *) procglobal->autovacFreeProcs;
procglobal->autovacFreeProcs = proc;
}
else if (IsBackgroundWorker)
{
proc->links.next = (SHM_QUEUE *) procglobal->bgworkerFreeProcs;
procglobal->bgworkerFreeProcs = proc;
}
else
{
proc->links.next = (SHM_QUEUE *) procglobal->freeProcs;
procglobal->freeProcs = proc;
}
/* Update shared estimate of spins_per_delay */ /* Update shared estimate of spins_per_delay */
procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay); procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay);
......
...@@ -78,6 +78,7 @@ struct PGPROC ...@@ -78,6 +78,7 @@ struct PGPROC
{ {
/* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */ /* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */
SHM_QUEUE links; /* list link if process is in a list */ SHM_QUEUE links; /* list link if process is in a list */
PGPROC **procgloballist; /* procglobal list that owns this PGPROC */
PGSemaphoreData sem; /* ONE semaphore to sleep on */ PGSemaphoreData sem; /* ONE semaphore to sleep on */
int waitStatus; /* STATUS_WAITING, STATUS_OK or STATUS_ERROR */ int waitStatus; /* STATUS_WAITING, STATUS_OK or STATUS_ERROR */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment