Commit 7882c3b0 authored by Andres Freund's avatar Andres Freund

Convert the PGPROC->lwWaitLink list into a dlist instead of open coding it.

Besides being shorter and much easier to read it changes the logic in
LWLockRelease() to release all shared lockers when waking up any. This
can yield some significant performance improvements - and the fairness
isn't really much worse than before, as we always allowed new shared
lockers to jump the queue.
parent 570bd2b3
......@@ -390,7 +390,6 @@ MarkAsPreparing(TransactionId xid, const char *gid,
proc->roleId = owner;
proc->lwWaiting = false;
proc->lwWaitMode = 0;
proc->lwWaitLink = NULL;
proc->waitLock = NULL;
proc->waitProcLock = NULL;
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
......
......@@ -117,9 +117,9 @@ inline static void
PRINT_LWDEBUG(const char *where, const LWLock *lock)
{
if (Trace_lwlocks)
elog(LOG, "%s(%s %d): excl %d shared %d head %p rOK %d",
elog(LOG, "%s(%s %d): excl %d shared %d rOK %d",
where, T_NAME(lock), T_ID(lock),
(int) lock->exclusive, lock->shared, lock->head,
(int) lock->exclusive, lock->shared,
(int) lock->releaseOK);
}
......@@ -479,8 +479,7 @@ LWLockInitialize(LWLock *lock, int tranche_id)
lock->exclusive = 0;
lock->shared = 0;
lock->tranche = tranche_id;
lock->head = NULL;
lock->tail = NULL;
dlist_init(&lock->waiters);
}
......@@ -619,12 +618,7 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
proc->lwWaiting = true;
proc->lwWaitMode = mode;
proc->lwWaitLink = NULL;
if (lock->head == NULL)
lock->head = proc;
else
lock->tail->lwWaitLink = proc;
lock->tail = proc;
dlist_push_head(&lock->waiters, &proc->lwWaitLink);
/* Can release the mutex now */
SpinLockRelease(&lock->mutex);
......@@ -840,12 +834,7 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
proc->lwWaiting = true;
proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
proc->lwWaitLink = NULL;
if (lock->head == NULL)
lock->head = proc;
else
lock->tail->lwWaitLink = proc;
lock->tail = proc;
dlist_push_head(&lock->waiters, &proc->lwWaitLink);
/* Can release the mutex now */
SpinLockRelease(&lock->mutex);
......@@ -1002,10 +991,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
proc->lwWaiting = true;
proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
/* waiters are added to the front of the queue */
proc->lwWaitLink = lock->head;
if (lock->head == NULL)
lock->tail = proc;
lock->head = proc;
dlist_push_head(&lock->waiters, &proc->lwWaitLink);
/*
* Set releaseOK, to make sure we get woken up as soon as the lock is
......@@ -1087,9 +1073,10 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
void
LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
{
PGPROC *head;
PGPROC *proc;
PGPROC *next;
dlist_head wakeup;
dlist_mutable_iter iter;
dlist_init(&wakeup);
/* Acquire mutex. Time spent holding mutex should be short! */
SpinLockAcquire(&lock->mutex);
......@@ -1104,24 +1091,16 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
* See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
* up. They are always in the front of the queue.
*/
head = lock->head;
if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
dlist_foreach_modify(iter, &lock->waiters)
{
proc = head;
next = proc->lwWaitLink;
while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
{
proc = next;
next = next->lwWaitLink;
}
PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
break;
/* proc is now the last PGPROC to be released */
lock->head = next;
proc->lwWaitLink = NULL;
dlist_delete(&waiter->lwWaitLink);
dlist_push_tail(&wakeup, &waiter->lwWaitLink);
}
else
head = NULL;
/* We are done updating shared state of the lock itself. */
SpinLockRelease(&lock->mutex);
......@@ -1129,15 +1108,14 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
/*
* Awaken any waiters I removed from the queue.
*/
while (head != NULL)
dlist_foreach_modify(iter, &wakeup)
{
proc = head;
head = proc->lwWaitLink;
proc->lwWaitLink = NULL;
PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
dlist_delete(&waiter->lwWaitLink);
/* check comment in LWLockRelease() about this barrier */
pg_write_barrier();
proc->lwWaiting = false;
PGSemaphoreUnlock(&proc->sem);
waiter->lwWaiting = false;
PGSemaphoreUnlock(&waiter->sem);
}
}
......@@ -1148,10 +1126,12 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
void
LWLockRelease(LWLock *lock)
{
PGPROC *head;
PGPROC *proc;
dlist_head wakeup;
dlist_mutable_iter iter;
int i;
dlist_init(&wakeup);
PRINT_LWDEBUG("LWLockRelease", lock);
/*
......@@ -1187,58 +1167,39 @@ LWLockRelease(LWLock *lock)
* if someone has already awakened waiters that haven't yet acquired the
* lock.
*/
head = lock->head;
if (head != NULL)
if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
{
if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
/*
* Remove the to-be-awakened PGPROCs from the queue.
*/
bool releaseOK = true;
bool wokeup_somebody = false;
dlist_foreach_modify(iter, &lock->waiters)
{
/*
* Remove the to-be-awakened PGPROCs from the queue.
*/
bool releaseOK = true;
PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
proc = head;
if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
continue;
/*
* First wake up any backends that want to be woken up without
* acquiring the lock.
*/
while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
proc = proc->lwWaitLink;
dlist_delete(&waiter->lwWaitLink);
dlist_push_tail(&wakeup, &waiter->lwWaitLink);
/*
* If the front waiter wants exclusive lock, awaken him only.
* Otherwise awaken as many waiters as want shared access.
* Prevent additional wakeups until retryer gets to
* run. Backends that are just waiting for the lock to become
* free don't retry automatically.
*/
if (proc->lwWaitMode != LW_EXCLUSIVE)
if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
{
while (proc->lwWaitLink != NULL &&
proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
{
if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
releaseOK = false;
proc = proc->lwWaitLink;
}
}
/* proc is now the last PGPROC to be released */
lock->head = proc->lwWaitLink;
proc->lwWaitLink = NULL;
/*
* Prevent additional wakeups until retryer gets to run. Backends
* that are just waiting for the lock to become free don't retry
* automatically.
*/
if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
releaseOK = false;
wokeup_somebody = true;
}
lock->releaseOK = releaseOK;
}
else
{
/* lock is still held, can't awaken anything */
head = NULL;
if(waiter->lwWaitMode == LW_EXCLUSIVE)
break;
}
lock->releaseOK = releaseOK;
}
/* We are done updating shared state of the lock itself. */
......@@ -1249,13 +1210,12 @@ LWLockRelease(LWLock *lock)
/*
* Awaken any waiters I removed from the queue.
*/
while (head != NULL)
dlist_foreach_modify(iter, &wakeup)
{
PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
LOG_LWDEBUG("LWLockRelease", T_NAME(lock), T_ID(lock),
"release waiter");
proc = head;
head = proc->lwWaitLink;
proc->lwWaitLink = NULL;
dlist_delete(&waiter->lwWaitLink);
/*
* Guarantee that lwWaiting being unset only becomes visible once the
* unlink from the link has completed. Otherwise the target backend
......@@ -1267,8 +1227,8 @@ LWLockRelease(LWLock *lock)
* another lock.
*/
pg_write_barrier();
proc->lwWaiting = false;
PGSemaphoreUnlock(&proc->sem);
waiter->lwWaiting = false;
PGSemaphoreUnlock(&waiter->sem);
}
/*
......
......@@ -372,7 +372,6 @@ InitProcess(void)
MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
MyProc->lwWaiting = false;
MyProc->lwWaitMode = 0;
MyProc->lwWaitLink = NULL;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
#ifdef USE_ASSERT_CHECKING
......@@ -535,7 +534,6 @@ InitAuxiliaryProcess(void)
MyPgXact->vacuumFlags = 0;
MyProc->lwWaiting = false;
MyProc->lwWaitMode = 0;
MyProc->lwWaitLink = NULL;
MyProc->waitLock = NULL;
MyProc->waitProcLock = NULL;
#ifdef USE_ASSERT_CHECKING
......
......@@ -14,6 +14,7 @@
#ifndef LWLOCK_H
#define LWLOCK_H
#include "lib/ilist.h"
#include "storage/s_lock.h"
struct PGPROC;
......@@ -50,9 +51,7 @@ typedef struct LWLock
char exclusive; /* # of exclusive holders (0 or 1) */
int shared; /* # of shared holders (0..MaxBackends) */
int tranche; /* tranche ID */
struct PGPROC *head; /* head of list of waiting PGPROCs */
struct PGPROC *tail; /* tail of list of waiting PGPROCs */
/* tail is undefined when head is NULL */
dlist_head waiters; /* list of waiting PGPROCs */
} LWLock;
/*
......
......@@ -15,6 +15,7 @@
#define _PROC_H_
#include "access/xlogdefs.h"
#include "lib/ilist.h"
#include "storage/latch.h"
#include "storage/lock.h"
#include "storage/pg_sema.h"
......@@ -104,7 +105,7 @@ struct PGPROC
/* Info about LWLock the process is currently waiting for, if any. */
bool lwWaiting; /* true if waiting for an LW lock */
uint8 lwWaitMode; /* lwlock mode being waited for */
struct PGPROC *lwWaitLink; /* next waiter for same LW lock */
dlist_node lwWaitLink; /* position in LW lock wait list */
/* Info about lock the process is currently waiting for, if any. */
/* waitLock and waitProcLock are NULL if not currently waiting. */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment