Commit d8e950d3 authored by Andres Freund's avatar Andres Freund

Improve and cleanup ProcArrayAdd(), ProcArrayRemove().

941697c3 changed ProcArrayAdd()/Remove() substantially. As reported by
zhanyi, I missed that due to the introduction of the PGPROC->pgxactoff
ProcArrayRemove() does not need to search for the position in
procArray->pgprocnos anymore - that's pgxactoff. Remove the search loop.

The removal of the search loop reduces assertion coverage a bit - but we can
easily do better than before by adding assertions to other loops over
pgprocnos elements.

Also do a bit of cleanup, mostly by reducing line lengths by introducing local
helper variables and adding newlines.

Author: zhanyi <w@hidva.com>
Author: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/tencent_5624AA3B116B3D1C31CA9744@qq.com
parent d120e66f
......@@ -446,6 +446,7 @@ ProcArrayAdd(PGPROC *proc)
{
ProcArrayStruct *arrayP = procArray;
int index;
int movecount;
/* See ProcGlobal comment explaining why both locks are held */
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
......@@ -474,33 +475,48 @@ ProcArrayAdd(PGPROC *proc)
*/
for (index = 0; index < arrayP->numProcs; index++)
{
/*
* If we are the first PGPROC or if we have found our right position
* in the array, break
*/
if ((arrayP->pgprocnos[index] == -1) || (arrayP->pgprocnos[index] > proc->pgprocno))
int procno PG_USED_FOR_ASSERTS_ONLY = arrayP->pgprocnos[index];
Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
Assert(allProcs[procno].pgxactoff == index);
/* If we have found our right position in the array, break */
if (arrayP->pgprocnos[index] > proc->pgprocno)
break;
}
memmove(&arrayP->pgprocnos[index + 1], &arrayP->pgprocnos[index],
(arrayP->numProcs - index) * sizeof(*arrayP->pgprocnos));
memmove(&ProcGlobal->xids[index + 1], &ProcGlobal->xids[index],
(arrayP->numProcs - index) * sizeof(*ProcGlobal->xids));
memmove(&ProcGlobal->subxidStates[index + 1], &ProcGlobal->subxidStates[index],
(arrayP->numProcs - index) * sizeof(*ProcGlobal->subxidStates));
memmove(&ProcGlobal->statusFlags[index + 1], &ProcGlobal->statusFlags[index],
(arrayP->numProcs - index) * sizeof(*ProcGlobal->statusFlags));
movecount = arrayP->numProcs - index;
memmove(&arrayP->pgprocnos[index + 1],
&arrayP->pgprocnos[index],
movecount * sizeof(*arrayP->pgprocnos));
memmove(&ProcGlobal->xids[index + 1],
&ProcGlobal->xids[index],
movecount * sizeof(*ProcGlobal->xids));
memmove(&ProcGlobal->subxidStates[index + 1],
&ProcGlobal->subxidStates[index],
movecount * sizeof(*ProcGlobal->subxidStates));
memmove(&ProcGlobal->statusFlags[index + 1],
&ProcGlobal->statusFlags[index],
movecount * sizeof(*ProcGlobal->statusFlags));
arrayP->pgprocnos[index] = proc->pgprocno;
proc->pgxactoff = index;
ProcGlobal->xids[index] = proc->xid;
ProcGlobal->subxidStates[index] = proc->subxidStatus;
ProcGlobal->statusFlags[index] = proc->statusFlags;
arrayP->numProcs++;
/* adjust pgxactoff for all following PGPROCs */
index++;
for (; index < arrayP->numProcs; index++)
{
allProcs[arrayP->pgprocnos[index]].pgxactoff = index;
int procno = arrayP->pgprocnos[index];
Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
Assert(allProcs[procno].pgxactoff == index - 1);
allProcs[procno].pgxactoff = index;
}
/*
......@@ -525,7 +541,8 @@ void
ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
{
ProcArrayStruct *arrayP = procArray;
int index;
int myoff;
int movecount;
#ifdef XIDCACHE_DEBUG
/* dump stats at backend shutdown, but not prepared-xact end */
......@@ -537,11 +554,14 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
Assert(ProcGlobal->allProcs[arrayP->pgprocnos[proc->pgxactoff]].pgxactoff == proc->pgxactoff);
myoff = proc->pgxactoff;
Assert(myoff >= 0 && myoff < arrayP->numProcs);
Assert(ProcGlobal->allProcs[arrayP->pgprocnos[myoff]].pgxactoff == myoff);
if (TransactionIdIsValid(latestXid))
{
Assert(TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff]));
Assert(TransactionIdIsValid(ProcGlobal->xids[myoff]));
/* Advance global latestCompletedXid while holding the lock */
MaintainLatestCompletedXid(latestXid);
......@@ -549,57 +569,60 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
/* Same with xactCompletionCount */
ShmemVariableCache->xactCompletionCount++;
ProcGlobal->xids[proc->pgxactoff] = 0;
ProcGlobal->subxidStates[proc->pgxactoff].overflowed = false;
ProcGlobal->subxidStates[proc->pgxactoff].count = 0;
ProcGlobal->xids[myoff] = InvalidTransactionId;
ProcGlobal->subxidStates[myoff].overflowed = false;
ProcGlobal->subxidStates[myoff].count = 0;
}
else
{
/* Shouldn't be trying to remove a live transaction here */
Assert(!TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff]));
Assert(!TransactionIdIsValid(ProcGlobal->xids[myoff]));
}
Assert(TransactionIdIsValid(ProcGlobal->xids[proc->pgxactoff] == 0));
Assert(TransactionIdIsValid(ProcGlobal->subxidStates[proc->pgxactoff].count == 0));
Assert(TransactionIdIsValid(ProcGlobal->subxidStates[proc->pgxactoff].overflowed == false));
ProcGlobal->statusFlags[proc->pgxactoff] = 0;
Assert(!TransactionIdIsValid(ProcGlobal->xids[myoff]));
Assert(ProcGlobal->subxidStates[myoff].count == 0);
Assert(ProcGlobal->subxidStates[myoff].overflowed == false);
ProcGlobal->statusFlags[myoff] = 0;
for (index = 0; index < arrayP->numProcs; index++)
{
if (arrayP->pgprocnos[index] == proc->pgprocno)
{
/* Keep the PGPROC array sorted. See notes above */
memmove(&arrayP->pgprocnos[index], &arrayP->pgprocnos[index + 1],
(arrayP->numProcs - index - 1) * sizeof(*arrayP->pgprocnos));
memmove(&ProcGlobal->xids[index], &ProcGlobal->xids[index + 1],
(arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->xids));
memmove(&ProcGlobal->subxidStates[index], &ProcGlobal->subxidStates[index + 1],
(arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->subxidStates));
memmove(&ProcGlobal->statusFlags[index], &ProcGlobal->statusFlags[index + 1],
(arrayP->numProcs - index - 1) * sizeof(*ProcGlobal->statusFlags));
movecount = arrayP->numProcs - myoff - 1;
memmove(&arrayP->pgprocnos[myoff],
&arrayP->pgprocnos[myoff + 1],
movecount * sizeof(*arrayP->pgprocnos));
memmove(&ProcGlobal->xids[myoff],
&ProcGlobal->xids[myoff + 1],
movecount * sizeof(*ProcGlobal->xids));
memmove(&ProcGlobal->subxidStates[myoff],
&ProcGlobal->subxidStates[myoff + 1],
movecount * sizeof(*ProcGlobal->subxidStates));
memmove(&ProcGlobal->statusFlags[myoff],
&ProcGlobal->statusFlags[myoff + 1],
movecount * sizeof(*ProcGlobal->statusFlags));
arrayP->pgprocnos[arrayP->numProcs - 1] = -1; /* for debugging */
arrayP->numProcs--;
/* adjust for removed PGPROC */
for (; index < arrayP->numProcs; index++)
allProcs[arrayP->pgprocnos[index]].pgxactoff--;
/*
* Release in reversed acquisition order, to reduce frequency of
* having to wait for XidGenLock while holding ProcArrayLock.
* Adjust pgxactoff of following procs for removed PGPROC (note that
* numProcs already has been decremented).
*/
LWLockRelease(XidGenLock);
LWLockRelease(ProcArrayLock);
return;
}
for (int index = myoff; index < arrayP->numProcs; index++)
{
int procno = arrayP->pgprocnos[index];
Assert(procno >= 0 && procno < (arrayP->maxProcs + NUM_AUXILIARY_PROCS));
Assert(allProcs[procno].pgxactoff - 1 == index);
allProcs[procno].pgxactoff = index;
}
/* Oops */
/*
* Release in reversed acquisition order, to reduce frequency of having to
* wait for XidGenLock while holding ProcArrayLock.
*/
LWLockRelease(XidGenLock);
LWLockRelease(ProcArrayLock);
elog(LOG, "failed to find proc %p in ProcArray", proc);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment