Commit 9b38d46d authored by Heikki Linnakangas's avatar Heikki Linnakangas

Make group commit more effective.

When a backend needs to flush the WAL, and someone else is already flushing
the WAL, wait until it releases the WALInsertLock and check if we still need
to do the flush or if the other backend already did the work for us, before
acquiring WALInsertLock. This helps group commit, because when the WAL flush
finishes, all the backends that were waiting for it can be woken up in one
go, and the can all concurrently observe that they're done, rather than
waking them up one by one in a cascading fashion.

This is based on a new LWLock function, LWLockWaitUntilFree(), which has
peculiar semantics. If the lock is immediately free, it grabs the lock and
returns true. If it's not free, it waits until it is released, but then
returns false without grabbing the lock. This is used in XLogFlush(), so
that when the lock is acquired, the backend flushes the WAL, but if it's
not, the backend first checks the current flush location before retrying.

Original patch and benchmarking by Peter Geoghegan and Simon Riggs, although
this patch as committed ended up being very different from that.
parent ba1868ba
......@@ -327,7 +327,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
proc->databaseId = databaseid;
proc->roleId = owner;
proc->lwWaiting = false;
proc->lwExclusive = false;
proc->lwWaitMode = 0;
proc->lwWaitLink = NULL;
proc->waitLock = NULL;
proc->waitProcLock = NULL;
......
......@@ -2118,23 +2118,43 @@ XLogFlush(XLogRecPtr record)
/* initialize to given target; may increase below */
WriteRqstPtr = record;
/* read LogwrtResult and update local state */
/*
* Now wait until we get the write lock, or someone else does the
* flush for us.
*/
for (;;)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
/* read LogwrtResult and update local state */
SpinLockAcquire(&xlogctl->info_lck);
if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
WriteRqstPtr = xlogctl->LogwrtRqst.Write;
LogwrtResult = xlogctl->LogwrtResult;
SpinLockRelease(&xlogctl->info_lck);
}
/* done already? */
if (!XLByteLE(record, LogwrtResult.Flush))
{
/* now wait for the write lock */
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
/* done already? */
if (XLByteLE(record, LogwrtResult.Flush))
break;
/*
* Try to get the write lock. If we can't get it immediately, wait
* until it's released, and recheck if we still need to do the flush
* or if the backend that held the lock did it for us already. This
* helps to maintain a good rate of group committing when the system
* is bottlenecked by the speed of fsyncing.
*/
if (!LWLockWaitUntilFree(WALWriteLock, LW_EXCLUSIVE))
{
/*
* The lock is now free, but we didn't acquire it yet. Before we
* do, loop back to check if someone else flushed the record for
* us already.
*/
continue;
}
/* Got the lock */
LogwrtResult = XLogCtl->Write.LogwrtResult;
if (!XLByteLE(record, LogwrtResult.Flush))
{
......@@ -2163,6 +2183,8 @@ XLogFlush(XLogRecPtr record)
XLogWrite(WriteRqst, false, false);
}
LWLockRelease(WALWriteLock);
/* done */
break;
}
END_CRIT_SECTION();
......
......@@ -430,7 +430,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
elog(PANIC, "cannot wait without a PGPROC structure");
proc->lwWaiting = true;
proc->lwExclusive = (mode == LW_EXCLUSIVE);
proc->lwWaitMode = mode;
proc->lwWaitLink = NULL;
if (lock->head == NULL)
lock->head = proc;
......@@ -564,6 +564,144 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
return !mustwait;
}
/*
* LWLockWaitUntilFree - Wait until a lock is free
*
* The semantics of this function are a bit funky. If the lock is currently
* free, it is acquired in the given mode, and the function returns true. If
* the lock isn't immediately free, the function waits until it is released
* and returns false, but does not acquire the lock.
*
* This is currently used for WALWriteLock: when a backend flushes the WAL,
* holding WALWriteLock, it can flush the commit records of many other
* backends as a side-effect. Those other backends need to wait until the
* flush finishes, but don't need to acquire the lock anymore. They can just
* wake up, observe that their records have already been flushed, and return.
*/
bool
LWLockWaitUntilFree(LWLockId lockid, LWLockMode mode)
{
volatile LWLock *lock = &(LWLockArray[lockid].lock);
PGPROC *proc = MyProc;
bool mustwait;
int extraWaits = 0;
PRINT_LWDEBUG("LWLockWaitUntilFree", lockid, lock);
/* Ensure we will have room to remember the lock */
if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
elog(ERROR, "too many LWLocks taken");
/*
* Lock out cancel/die interrupts until we exit the code section protected
* by the LWLock. This ensures that interrupts will not interfere with
* manipulations of data structures in shared memory.
*/
HOLD_INTERRUPTS();
/* Acquire mutex. Time spent holding mutex should be short! */
SpinLockAcquire(&lock->mutex);
/* If I can get the lock, do so quickly. */
if (mode == LW_EXCLUSIVE)
{
if (lock->exclusive == 0 && lock->shared == 0)
{
lock->exclusive++;
mustwait = false;
}
else
mustwait = true;
}
else
{
if (lock->exclusive == 0)
{
lock->shared++;
mustwait = false;
}
else
mustwait = true;
}
if (mustwait)
{
/*
* Add myself to wait queue.
*
* If we don't have a PGPROC structure, there's no way to wait. This
* should never occur, since MyProc should only be null during shared
* memory initialization.
*/
if (proc == NULL)
elog(PANIC, "cannot wait without a PGPROC structure");
proc->lwWaiting = true;
proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
proc->lwWaitLink = NULL;
if (lock->head == NULL)
lock->head = proc;
else
lock->tail->lwWaitLink = proc;
lock->tail = proc;
/* Can release the mutex now */
SpinLockRelease(&lock->mutex);
/*
* Wait until awakened. Like in LWLockAcquire, be prepared for bogus
* wakups, because we share the semaphore with ProcWaitForSignal.
*/
LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "waiting");
#ifdef LWLOCK_STATS
block_counts[lockid]++;
#endif
TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
for (;;)
{
/* "false" means cannot accept cancel/die interrupt here. */
PGSemaphoreLock(&proc->sem, false);
if (!proc->lwWaiting)
break;
extraWaits++;
}
TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "awakened");
}
else
{
/* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex);
}
/*
* Fix the process wait semaphore's count for any absorbed wakeups.
*/
while (extraWaits-- > 0)
PGSemaphoreUnlock(&proc->sem);
if (mustwait)
{
/* Failed to get lock, so release interrupt holdoff */
RESUME_INTERRUPTS();
LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "failed");
TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
}
else
{
/* Add lock to list of locks held by this backend */
held_lwlocks[num_held_lwlocks++] = lockid;
TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
}
return !mustwait;
}
/*
* LWLockRelease - release a previously acquired lock
*/
......@@ -618,20 +756,36 @@ LWLockRelease(LWLockId lockid)
/*
* Remove the to-be-awakened PGPROCs from the queue. If the front
* waiter wants exclusive lock, awaken him only. Otherwise awaken
* as many waiters as want shared access.
* as many waiters as want shared access (or just want to be
* woken up when the lock becomes free without acquiring it,
* ie. LWLockWaitUntilFree).
*/
bool releaseOK = true;
proc = head;
if (!proc->lwExclusive)
if (proc->lwWaitMode != LW_EXCLUSIVE)
{
while (proc->lwWaitLink != NULL &&
!proc->lwWaitLink->lwExclusive)
proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
{
proc = proc->lwWaitLink;
if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
releaseOK = false;
}
}
/* proc is now the last PGPROC to be released */
lock->head = proc->lwWaitLink;
proc->lwWaitLink = NULL;
/* prevent additional wakeups until retryer gets to run */
lock->releaseOK = false;
/*
* Prevent additional wakeups until retryer gets to run. Backends
* that are just waiting for the lock to become free don't prevent
* wakeups, because they might decide that they don't want the
* lock, after all.
*/
if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
releaseOK = false;
lock->releaseOK = releaseOK;
}
else
{
......
......@@ -362,7 +362,7 @@ InitProcess(void)
if (IsAutoVacuumWorkerProcess())
MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
MyProc->lwWaitMode = 0;
MyProc->lwWaitLink = NULL;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
......@@ -517,7 +517,7 @@ InitAuxiliaryProcess(void)
MyPgXact->inCommit = false;
MyPgXact->vacuumFlags = 0;
MyProc->lwWaiting = false;
MyProc->lwExclusive = false;
MyProc->lwWaitMode = 0;
MyProc->lwWaitLink = NULL;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
......
......@@ -35,6 +35,8 @@ provider postgresql {
probe lwlock__wait__done(LWLockId, LWLockMode);
probe lwlock__condacquire(LWLockId, LWLockMode);
probe lwlock__condacquire__fail(LWLockId, LWLockMode);
probe lwlock__wait__until__free(LWLockId, LWLockMode);
probe lwlock__wait__until__free__fail(LWLockId, LWLockMode);
probe lock__wait__start(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
probe lock__wait__done(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
......
......@@ -94,7 +94,10 @@ typedef enum LWLockId
typedef enum LWLockMode
{
LW_EXCLUSIVE,
LW_SHARED
LW_SHARED,
LW_WAIT_UNTIL_FREE /* A special mode used in PGPROC->lwlockMode, when
* waiting for lock to become free. Not to be used
* as LWLockAcquire argument */
} LWLockMode;
......@@ -105,6 +108,7 @@ extern bool Trace_lwlocks;
extern LWLockId LWLockAssign(void);
extern void LWLockAcquire(LWLockId lockid, LWLockMode mode);
extern bool LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode);
extern bool LWLockWaitUntilFree(LWLockId lockid, LWLockMode mode);
extern void LWLockRelease(LWLockId lockid);
extern void LWLockReleaseAll(void);
extern bool LWLockHeldByMe(LWLockId lockid);
......
......@@ -101,7 +101,7 @@ struct PGPROC
/* Info about LWLock the process is currently waiting for, if any. */
bool lwWaiting; /* true if waiting for an LW lock */
bool lwExclusive; /* true if waiting for exclusive access */
uint8 lwWaitMode; /* lwlock mode being waited for */
struct PGPROC *lwWaitLink; /* next waiter for same LW lock */
/* Info about lock the process is currently waiting for, if any. */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment