Commit 4aec4989 authored by Robert Haas's avatar Robert Haas

Assorted code review for recent ProcArrayLock patch.

Post-commit review by Andres Freund discovered a couple of concurrency
bugs in the original patch: specifically, if the leader cleared a
follower's XID before it reached PGSemaphoreLock, the semaphore would be
left in the wrong state; and if another process did PGSemaphoreUnlock
for some unrelated reason, we might resume execution before the fact
that our XID was cleared was globally visible.

Also, improve the wording of some comments, rename nextClearXidElem
to firstClearXidElem in PROC_HDR for clarity, and drop some volatile
qualifiers that aren't necessary.

Amit Kapila, reviewed and slightly revised by me.
parent 1ea5ce5c
...@@ -470,10 +470,10 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact, ...@@ -470,10 +470,10 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
* commit time, add ourselves to a list of processes that need their XIDs * commit time, add ourselves to a list of processes that need their XIDs
* cleared. The first process to add itself to the list will acquire * cleared. The first process to add itself to the list will acquire
* ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal * ProcArrayLock in exclusive mode and perform ProcArrayEndTransactionInternal
* on behalf of all group members. This avoids a great deal of context * on behalf of all group members. This avoids a great deal of contention
* switching when many processes are trying to commit at once, since the lock * around ProcArrayLock when many processes are trying to commit at once,
* only needs to be handed from the last share-locker to one process waiting * since the lock need not be repeatedly handed off from one committing
* for the exclusive lock, rather than to each one in turn. * process to the next.
*/ */
static void static void
ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid) ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid)
...@@ -487,28 +487,39 @@ ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid) ...@@ -487,28 +487,39 @@ ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid)
Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid)); Assert(TransactionIdIsValid(allPgXact[proc->pgprocno].xid));
/* Add ourselves to the list of processes needing a group XID clear. */ /* Add ourselves to the list of processes needing a group XID clear. */
proc->clearXid = true;
proc->backendLatestXid = latestXid; proc->backendLatestXid = latestXid;
while (true) while (true)
{ {
nextidx = pg_atomic_read_u32(&procglobal->nextClearXidElem); nextidx = pg_atomic_read_u32(&procglobal->firstClearXidElem);
pg_atomic_write_u32(&proc->nextClearXidElem, nextidx); pg_atomic_write_u32(&proc->nextClearXidElem, nextidx);
if (pg_atomic_compare_exchange_u32(&procglobal->nextClearXidElem, if (pg_atomic_compare_exchange_u32(&procglobal->firstClearXidElem,
&nextidx, &nextidx,
(uint32) proc->pgprocno)) (uint32) proc->pgprocno))
break; break;
} }
/* If the list was not empty, the leader will clear our XID. */ /*
* If the list was not empty, the leader will clear our XID. It is
* impossible to have followers without a leader because the first process
* that has added itself to the list will always have nextidx as
* INVALID_PGPROCNO.
*/
if (nextidx != INVALID_PGPROCNO) if (nextidx != INVALID_PGPROCNO)
{ {
/* Sleep until the leader clears our XID. */ /* Sleep until the leader clears our XID. */
while (pg_atomic_read_u32(&proc->nextClearXidElem) != INVALID_PGPROCNO) for (;;)
{ {
extraWaits++; /* acts as a read barrier */
PGSemaphoreLock(&proc->sem); PGSemaphoreLock(&proc->sem);
if (!proc->clearXid)
break;
extraWaits++;
} }
Assert(pg_atomic_read_u32(&proc->nextClearXidElem) == INVALID_PGPROCNO);
/* Fix semaphore count for any absorbed wakeups */ /* Fix semaphore count for any absorbed wakeups */
while (extraWaits-- > 0) while (extraWaits-- > 0)
PGSemaphoreUnlock(&proc->sem); PGSemaphoreUnlock(&proc->sem);
...@@ -520,12 +531,13 @@ ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid) ...@@ -520,12 +531,13 @@ ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid)
/* /*
* Now that we've got the lock, clear the list of processes waiting for * Now that we've got the lock, clear the list of processes waiting for
* group XID clearing, saving a pointer to the head of the list. * group XID clearing, saving a pointer to the head of the list. Trying
* to pop elements one at a time could lead to an ABA problem.
*/ */
while (true) while (true)
{ {
nextidx = pg_atomic_read_u32(&procglobal->nextClearXidElem); nextidx = pg_atomic_read_u32(&procglobal->firstClearXidElem);
if (pg_atomic_compare_exchange_u32(&procglobal->nextClearXidElem, if (pg_atomic_compare_exchange_u32(&procglobal->firstClearXidElem,
&nextidx, &nextidx,
INVALID_PGPROCNO)) INVALID_PGPROCNO))
break; break;
...@@ -563,6 +575,11 @@ ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid) ...@@ -563,6 +575,11 @@ ProcArrayGroupClearXid(PGPROC *proc, TransactionId latestXid)
wakeidx = pg_atomic_read_u32(&proc->nextClearXidElem); wakeidx = pg_atomic_read_u32(&proc->nextClearXidElem);
pg_atomic_write_u32(&proc->nextClearXidElem, INVALID_PGPROCNO); pg_atomic_write_u32(&proc->nextClearXidElem, INVALID_PGPROCNO);
/* ensure all previous writes are visible before follower continues. */
pg_write_barrier();
proc->clearXid = false;
if (proc != MyProc) if (proc != MyProc)
PGSemaphoreUnlock(&proc->sem); PGSemaphoreUnlock(&proc->sem);
} }
......
...@@ -181,7 +181,7 @@ InitProcGlobal(void) ...@@ -181,7 +181,7 @@ InitProcGlobal(void)
ProcGlobal->startupBufferPinWaitBufId = -1; ProcGlobal->startupBufferPinWaitBufId = -1;
ProcGlobal->walwriterLatch = NULL; ProcGlobal->walwriterLatch = NULL;
ProcGlobal->checkpointerLatch = NULL; ProcGlobal->checkpointerLatch = NULL;
pg_atomic_init_u32(&ProcGlobal->nextClearXidElem, INVALID_PGPROCNO); pg_atomic_init_u32(&ProcGlobal->firstClearXidElem, INVALID_PGPROCNO);
/* /*
* Create and initialize all the PGPROC structures we'll need. There are * Create and initialize all the PGPROC structures we'll need. There are
...@@ -395,6 +395,7 @@ InitProcess(void) ...@@ -395,6 +395,7 @@ InitProcess(void)
SHMQueueElemInit(&(MyProc->syncRepLinks)); SHMQueueElemInit(&(MyProc->syncRepLinks));
/* Initialize fields for group XID clearing. */ /* Initialize fields for group XID clearing. */
MyProc->clearXid = false;
MyProc->backendLatestXid = InvalidTransactionId; MyProc->backendLatestXid = InvalidTransactionId;
pg_atomic_init_u32(&MyProc->nextClearXidElem, INVALID_PGPROCNO); pg_atomic_init_u32(&MyProc->nextClearXidElem, INVALID_PGPROCNO);
......
...@@ -142,7 +142,8 @@ struct PGPROC ...@@ -142,7 +142,8 @@ struct PGPROC
struct XidCache subxids; /* cache for subtransaction XIDs */ struct XidCache subxids; /* cache for subtransaction XIDs */
/* Support for group XID clearing. */ /* Support for group XID clearing. */
volatile pg_atomic_uint32 nextClearXidElem; bool clearXid;
pg_atomic_uint32 nextClearXidElem;
TransactionId backendLatestXid; TransactionId backendLatestXid;
/* Per-backend LWLock. Protects fields below. */ /* Per-backend LWLock. Protects fields below. */
...@@ -207,7 +208,7 @@ typedef struct PROC_HDR ...@@ -207,7 +208,7 @@ typedef struct PROC_HDR
/* Head of list of bgworker free PGPROC structures */ /* Head of list of bgworker free PGPROC structures */
PGPROC *bgworkerFreeProcs; PGPROC *bgworkerFreeProcs;
/* First pgproc waiting for group XID clear */ /* First pgproc waiting for group XID clear */
volatile pg_atomic_uint32 nextClearXidElem; pg_atomic_uint32 firstClearXidElem;
/* WALWriter process's latch */ /* WALWriter process's latch */
Latch *walwriterLatch; Latch *walwriterLatch;
/* Checkpointer process's latch */ /* Checkpointer process's latch */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment