Commit 48354581 authored by Andres Freund's avatar Andres Freund

Allow Pin/UnpinBuffer to operate in a lockfree manner.

Pinning/Unpinning a buffer is a very frequent operation; especially in
read-mostly cache resident workloads. Benchmarking shows that in various
scenarios the spinlock protecting a buffer header's state becomes a
significant bottleneck. The problem can be reproduced with pgbench -S on
larger machines, but can be considerably worse for queries which touch
the same buffers over and over at a high frequency (e.g. nested loops
over a small inner table).

To allow atomic operations to be used, cram BufferDesc's flags,
usage_count, buf_hdr_lock, refcount into a single 32bit atomic variable;
that allows to manipulate them together using 32bit compare-and-swap
operations. This requires reducing MAX_BACKENDS to 2^18-1 (which could
be lifted by using a 64bit field, but it's not a realistic configuration
atm).

As not all operations can easily implemented in a lockfree manner,
implement the previous buf_hdr_lock via a flag bit in the atomic
variable. That way we can continue to lock the header in places where
it's needed, but can get away without acquiring it in the more frequent
hot-paths.  There's some additional operations which can be done without
the lock, but aren't in this patch; but the most important places are
covered.

As bufmgr.c now essentially re-implements spinlocks, abstract the delay
logic from s_lock.c into something more generic. It now has already two
users, and more are coming up; there's a follupw patch for lwlock.c at
least.

This patch is based on a proof-of-concept written by me, which Alexander
Korotkov made into a fully working patch; the committed version is again
revised by me.  Benchmarking and testing has, amongst others, been
provided by Dilip Kumar, Alexander Korotkov, Robert Haas.

On a large x86 system improvements for readonly pgbench, with a high
client count, of a factor of 8 have been observed.

Author: Alexander Korotkov and Andres Freund
Discussion: 2400449.GjM57CE0Yg@dinodell
parent cf223c3b
...@@ -148,11 +148,12 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) ...@@ -148,11 +148,12 @@ pg_buffercache_pages(PG_FUNCTION_ARGS)
*/ */
for (i = 0; i < NBuffers; i++) for (i = 0; i < NBuffers; i++)
{ {
volatile BufferDesc *bufHdr; BufferDesc *bufHdr;
uint32 buf_state;
bufHdr = GetBufferDescriptor(i); bufHdr = GetBufferDescriptor(i);
/* Lock each buffer header before inspecting. */ /* Lock each buffer header before inspecting. */
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
fctx->record[i].bufferid = BufferDescriptorGetBuffer(bufHdr); fctx->record[i].bufferid = BufferDescriptorGetBuffer(bufHdr);
fctx->record[i].relfilenode = bufHdr->tag.rnode.relNode; fctx->record[i].relfilenode = bufHdr->tag.rnode.relNode;
...@@ -160,21 +161,21 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) ...@@ -160,21 +161,21 @@ pg_buffercache_pages(PG_FUNCTION_ARGS)
fctx->record[i].reldatabase = bufHdr->tag.rnode.dbNode; fctx->record[i].reldatabase = bufHdr->tag.rnode.dbNode;
fctx->record[i].forknum = bufHdr->tag.forkNum; fctx->record[i].forknum = bufHdr->tag.forkNum;
fctx->record[i].blocknum = bufHdr->tag.blockNum; fctx->record[i].blocknum = bufHdr->tag.blockNum;
fctx->record[i].usagecount = bufHdr->usage_count; fctx->record[i].usagecount = BUF_STATE_GET_USAGECOUNT(buf_state);
fctx->record[i].pinning_backends = bufHdr->refcount; fctx->record[i].pinning_backends = BUF_STATE_GET_REFCOUNT(buf_state);
if (bufHdr->flags & BM_DIRTY) if (buf_state & BM_DIRTY)
fctx->record[i].isdirty = true; fctx->record[i].isdirty = true;
else else
fctx->record[i].isdirty = false; fctx->record[i].isdirty = false;
/* Note if the buffer is valid, and has storage created */ /* Note if the buffer is valid, and has storage created */
if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_TAG_VALID)) if ((buf_state & BM_VALID) && (buf_state & BM_TAG_VALID))
fctx->record[i].isvalid = true; fctx->record[i].isvalid = true;
else else
fctx->record[i].isvalid = false; fctx->record[i].isvalid = false;
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
} }
/* /*
......
...@@ -135,12 +135,9 @@ InitBufferPool(void) ...@@ -135,12 +135,9 @@ InitBufferPool(void)
BufferDesc *buf = GetBufferDescriptor(i); BufferDesc *buf = GetBufferDescriptor(i);
CLEAR_BUFFERTAG(buf->tag); CLEAR_BUFFERTAG(buf->tag);
buf->flags = 0;
buf->usage_count = 0;
buf->refcount = 0;
buf->wait_backend_pid = 0;
SpinLockInit(&buf->buf_hdr_lock); pg_atomic_init_u32(&buf->state, 0);
buf->wait_backend_pid = 0;
buf->buf_id = i; buf->buf_id = i;
......
...@@ -436,11 +436,12 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); ...@@ -436,11 +436,12 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(BufferDesc *buf); static void PinBuffer_Locked(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf, bool fixOwner); static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
static void BufferSync(int flags); static void BufferSync(int flags);
static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context);
static void WaitIO(BufferDesc *buf); static void WaitIO(BufferDesc *buf);
static bool StartBufferIO(BufferDesc *buf, bool forInput); static bool StartBufferIO(BufferDesc *buf, bool forInput);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
int set_flag_bits); uint32 set_flag_bits);
static void shared_buffer_write_error_callback(void *arg); static void shared_buffer_write_error_callback(void *arg);
static void local_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg);
static BufferDesc *BufferAlloc(SMgrRelation smgr, static BufferDesc *BufferAlloc(SMgrRelation smgr,
...@@ -816,8 +817,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -816,8 +817,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (isLocalBuf) if (isLocalBuf)
{ {
/* Only need to adjust flags */ /* Only need to adjust flags */
Assert(bufHdr->flags & BM_VALID); uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
bufHdr->flags &= ~BM_VALID;
Assert(buf_state & BM_VALID);
buf_state &= ~BM_VALID;
pg_atomic_write_u32(&bufHdr->state, buf_state);
} }
else else
{ {
...@@ -828,10 +832,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -828,10 +832,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*/ */
do do
{ {
LockBufHdr(bufHdr); uint32 buf_state = LockBufHdr(bufHdr);
Assert(bufHdr->flags & BM_VALID);
bufHdr->flags &= ~BM_VALID; Assert(buf_state & BM_VALID);
UnlockBufHdr(bufHdr); buf_state &= ~BM_VALID;
UnlockBufHdr(bufHdr, buf_state);
} while (!StartBufferIO(bufHdr, true)); } while (!StartBufferIO(bufHdr, true));
} }
} }
...@@ -848,7 +853,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -848,7 +853,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* it's not been recycled) but come right back here to try smgrextend * it's not been recycled) but come right back here to try smgrextend
* again. * again.
*/ */
Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */ Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
...@@ -933,7 +938,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -933,7 +938,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
if (isLocalBuf) if (isLocalBuf)
{ {
/* Only need to adjust flags */ /* Only need to adjust flags */
bufHdr->flags |= BM_VALID; uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
buf_state |= BM_VALID;
pg_atomic_write_u32(&bufHdr->state, buf_state);
} }
else else
{ {
...@@ -987,10 +995,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -987,10 +995,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
BufferTag oldTag; /* previous identity of selected buffer */ BufferTag oldTag; /* previous identity of selected buffer */
uint32 oldHash; /* hash value for oldTag */ uint32 oldHash; /* hash value for oldTag */
LWLock *oldPartitionLock; /* buffer partition lock for it */ LWLock *oldPartitionLock; /* buffer partition lock for it */
BufFlags oldFlags; uint32 oldFlags;
int buf_id; int buf_id;
BufferDesc *buf; BufferDesc *buf;
bool valid; bool valid;
uint32 buf_state;
/* create a tag so we can lookup the buffer */ /* create a tag so we can lookup the buffer */
INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
...@@ -1059,12 +1068,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -1059,12 +1068,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* Select a victim buffer. The buffer is returned with its header * Select a victim buffer. The buffer is returned with its header
* spinlock still held! * spinlock still held!
*/ */
buf = StrategyGetBuffer(strategy); buf = StrategyGetBuffer(strategy, &buf_state);
Assert(buf->refcount == 0); Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
/* Must copy buffer flags while we still hold the spinlock */ /* Must copy buffer flags while we still hold the spinlock */
oldFlags = buf->flags; oldFlags = buf_state & BUF_FLAG_MASK;
/* Pin the buffer and then release the buffer spinlock */ /* Pin the buffer and then release the buffer spinlock */
PinBuffer_Locked(buf); PinBuffer_Locked(buf);
...@@ -1108,9 +1117,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -1108,9 +1117,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
XLogRecPtr lsn; XLogRecPtr lsn;
/* Read the LSN while holding buffer header lock */ /* Read the LSN while holding buffer header lock */
LockBufHdr(buf); buf_state = LockBufHdr(buf);
lsn = BufferGetLSN(buf); lsn = BufferGetLSN(buf);
UnlockBufHdr(buf); UnlockBufHdr(buf, buf_state);
if (XLogNeedsFlush(lsn) && if (XLogNeedsFlush(lsn) &&
StrategyRejectBuffer(strategy, buf)) StrategyRejectBuffer(strategy, buf))
...@@ -1254,7 +1263,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -1254,7 +1263,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
/* /*
* Need to lock the buffer header too in order to change its tag. * Need to lock the buffer header too in order to change its tag.
*/ */
LockBufHdr(buf); buf_state = LockBufHdr(buf);
/* /*
* Somebody could have pinned or re-dirtied the buffer while we were * Somebody could have pinned or re-dirtied the buffer while we were
...@@ -1262,11 +1271,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -1262,11 +1271,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* recycle this buffer; we must undo everything we've done and start * recycle this buffer; we must undo everything we've done and start
* over with a new victim buffer. * over with a new victim buffer.
*/ */
oldFlags = buf->flags; oldFlags = buf_state & BUF_FLAG_MASK;
if (buf->refcount == 1 && !(oldFlags & BM_DIRTY)) if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY))
break; break;
UnlockBufHdr(buf); UnlockBufHdr(buf, buf_state);
BufTableDelete(&newTag, newHash); BufTableDelete(&newTag, newHash);
if ((oldFlags & BM_TAG_VALID) && if ((oldFlags & BM_TAG_VALID) &&
oldPartitionLock != newPartitionLock) oldPartitionLock != newPartitionLock)
...@@ -1284,14 +1293,15 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -1284,14 +1293,15 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
* 1 so that the buffer can survive one clock-sweep pass.) * 1 so that the buffer can survive one clock-sweep pass.)
*/ */
buf->tag = newTag; buf->tag = newTag;
buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT); buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT |
BUF_USAGECOUNT_MASK);
if (relpersistence == RELPERSISTENCE_PERMANENT) if (relpersistence == RELPERSISTENCE_PERMANENT)
buf->flags |= BM_TAG_VALID | BM_PERMANENT; buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
else else
buf->flags |= BM_TAG_VALID; buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
buf->usage_count = 1;
UnlockBufHdr(buf); UnlockBufHdr(buf, buf_state);
if (oldFlags & BM_TAG_VALID) if (oldFlags & BM_TAG_VALID)
{ {
...@@ -1338,12 +1348,15 @@ InvalidateBuffer(BufferDesc *buf) ...@@ -1338,12 +1348,15 @@ InvalidateBuffer(BufferDesc *buf)
BufferTag oldTag; BufferTag oldTag;
uint32 oldHash; /* hash value for oldTag */ uint32 oldHash; /* hash value for oldTag */
LWLock *oldPartitionLock; /* buffer partition lock for it */ LWLock *oldPartitionLock; /* buffer partition lock for it */
BufFlags oldFlags; uint32 oldFlags;
uint32 buf_state;
/* Save the original buffer tag before dropping the spinlock */ /* Save the original buffer tag before dropping the spinlock */
oldTag = buf->tag; oldTag = buf->tag;
UnlockBufHdr(buf); buf_state = pg_atomic_read_u32(&buf->state);
Assert(buf_state & BM_LOCKED);
UnlockBufHdr(buf, buf_state);
/* /*
* Need to compute the old tag's hashcode and partition lock ID. XXX is it * Need to compute the old tag's hashcode and partition lock ID. XXX is it
...@@ -1362,12 +1375,12 @@ retry: ...@@ -1362,12 +1375,12 @@ retry:
LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE); LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
/* Re-lock the buffer header */ /* Re-lock the buffer header */
LockBufHdr(buf); buf_state = LockBufHdr(buf);
/* If it's changed while we were waiting for lock, do nothing */ /* If it's changed while we were waiting for lock, do nothing */
if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) if (!BUFFERTAGS_EQUAL(buf->tag, oldTag))
{ {
UnlockBufHdr(buf); UnlockBufHdr(buf, buf_state);
LWLockRelease(oldPartitionLock); LWLockRelease(oldPartitionLock);
return; return;
} }
...@@ -1381,9 +1394,9 @@ retry: ...@@ -1381,9 +1394,9 @@ retry:
* yet done StartBufferIO, WaitIO will fall through and we'll effectively * yet done StartBufferIO, WaitIO will fall through and we'll effectively
* be busy-looping here.) * be busy-looping here.)
*/ */
if (buf->refcount != 0) if (BUF_STATE_GET_REFCOUNT(buf_state) != 0)
{ {
UnlockBufHdr(buf); UnlockBufHdr(buf, buf_state);
LWLockRelease(oldPartitionLock); LWLockRelease(oldPartitionLock);
/* safety check: should definitely not be our *own* pin */ /* safety check: should definitely not be our *own* pin */
if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0) if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0)
...@@ -1396,12 +1409,10 @@ retry: ...@@ -1396,12 +1409,10 @@ retry:
* Clear out the buffer's tag and flags. We must do this to ensure that * Clear out the buffer's tag and flags. We must do this to ensure that
* linear scans of the buffer array don't think the buffer is valid. * linear scans of the buffer array don't think the buffer is valid.
*/ */
oldFlags = buf->flags; oldFlags = buf_state & BUF_FLAG_MASK;
CLEAR_BUFFERTAG(buf->tag); CLEAR_BUFFERTAG(buf->tag);
buf->flags = 0; buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
buf->usage_count = 0; UnlockBufHdr(buf, buf_state);
UnlockBufHdr(buf);
/* /*
* Remove the buffer from the lookup hashtable, if it was in there. * Remove the buffer from the lookup hashtable, if it was in there.
...@@ -1433,6 +1444,8 @@ void ...@@ -1433,6 +1444,8 @@ void
MarkBufferDirty(Buffer buffer) MarkBufferDirty(Buffer buffer)
{ {
BufferDesc *bufHdr; BufferDesc *bufHdr;
uint32 buf_state;
uint32 old_buf_state;
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
elog(ERROR, "bad buffer ID: %d", buffer); elog(ERROR, "bad buffer ID: %d", buffer);
...@@ -1449,24 +1462,32 @@ MarkBufferDirty(Buffer buffer) ...@@ -1449,24 +1462,32 @@ MarkBufferDirty(Buffer buffer)
/* unfortunately we can't check if the lock is held exclusively */ /* unfortunately we can't check if the lock is held exclusively */
Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr))); Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr)));
LockBufHdr(bufHdr); old_buf_state = pg_atomic_read_u32(&bufHdr->state);
for (;;)
{
if (old_buf_state & BM_LOCKED)
old_buf_state = WaitBufHdrUnlocked(bufHdr);
Assert(bufHdr->refcount > 0); buf_state = old_buf_state;
Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state,
buf_state))
break;
}
/* /*
* If the buffer was not dirty already, do vacuum accounting. * If the buffer was not dirty already, do vacuum accounting.
*/ */
if (!(bufHdr->flags & BM_DIRTY)) if (!(old_buf_state & BM_DIRTY))
{ {
VacuumPageDirty++; VacuumPageDirty++;
pgBufferUsage.shared_blks_dirtied++; pgBufferUsage.shared_blks_dirtied++;
if (VacuumCostActive) if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageDirty; VacuumCostBalance += VacuumCostPageDirty;
} }
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
UnlockBufHdr(bufHdr);
} }
/* /*
...@@ -1531,6 +1552,10 @@ ReleaseAndReadBuffer(Buffer buffer, ...@@ -1531,6 +1552,10 @@ ReleaseAndReadBuffer(Buffer buffer,
* *
* This should be applied only to shared buffers, never local ones. * This should be applied only to shared buffers, never local ones.
* *
* Since buffers are pinned/unpinned very frequently, pin buffers without
* taking the buffer header lock; instead update the state variable in loop of
* CAS operations. Hopefully it's just a single CAS.
*
* Note that ResourceOwnerEnlargeBuffers must have been done already. * Note that ResourceOwnerEnlargeBuffers must have been done already.
* *
* Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows
...@@ -1547,23 +1572,34 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) ...@@ -1547,23 +1572,34 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
if (ref == NULL) if (ref == NULL)
{ {
uint32 buf_state;
uint32 old_buf_state;
ReservePrivateRefCountEntry(); ReservePrivateRefCountEntry();
ref = NewPrivateRefCountEntry(b); ref = NewPrivateRefCountEntry(b);
LockBufHdr(buf); old_buf_state = pg_atomic_read_u32(&buf->state);
buf->refcount++; for (;;)
if (strategy == NULL)
{
if (buf->usage_count < BM_MAX_USAGE_COUNT)
buf->usage_count++;
}
else
{ {
if (buf->usage_count == 0) if (old_buf_state & BM_LOCKED)
buf->usage_count = 1; old_buf_state = WaitBufHdrUnlocked(buf);
buf_state = old_buf_state;
/* increase refcount */
buf_state += BUF_REFCOUNT_ONE;
/* increase usagecount unless already max */
if (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT)
buf_state += BUF_USAGECOUNT_ONE;
if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
buf_state))
{
result = (buf_state & BM_VALID) != 0;
break;
}
} }
result = (buf->flags & BM_VALID) != 0;
UnlockBufHdr(buf);
} }
else else
{ {
...@@ -1603,6 +1639,7 @@ PinBuffer_Locked(BufferDesc *buf) ...@@ -1603,6 +1639,7 @@ PinBuffer_Locked(BufferDesc *buf)
{ {
Buffer b; Buffer b;
PrivateRefCountEntry *ref; PrivateRefCountEntry *ref;
uint32 buf_state;
/* /*
* As explained, We don't expect any preexisting pins. That allows us to * As explained, We don't expect any preexisting pins. That allows us to
...@@ -1610,8 +1647,14 @@ PinBuffer_Locked(BufferDesc *buf) ...@@ -1610,8 +1647,14 @@ PinBuffer_Locked(BufferDesc *buf)
*/ */
Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL);
buf->refcount++; /*
UnlockBufHdr(buf); * Since we hold the buffer spinlock, we can update the buffer state and
* release the lock in one operation.
*/
buf_state = pg_atomic_read_u32(&buf->state);
Assert(buf_state & BM_LOCKED);
buf_state += BUF_REFCOUNT_ONE;
UnlockBufHdr(buf, buf_state);
b = BufferDescriptorGetBuffer(buf); b = BufferDescriptorGetBuffer(buf);
...@@ -1646,30 +1689,59 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) ...@@ -1646,30 +1689,59 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
ref->refcount--; ref->refcount--;
if (ref->refcount == 0) if (ref->refcount == 0)
{ {
uint32 buf_state;
uint32 old_buf_state;
/* I'd better not still hold any locks on the buffer */ /* I'd better not still hold any locks on the buffer */
Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf))); Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf)));
Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf))); Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf)));
LockBufHdr(buf); /*
* Decrement the shared reference count.
*
* Since buffer spinlock holder can update status using just write,
* it's not safe to use atomic decrement here; thus use a CAS loop.
*/
old_buf_state = pg_atomic_read_u32(&buf->state);
for (;;)
{
if (old_buf_state & BM_LOCKED)
old_buf_state = WaitBufHdrUnlocked(buf);
buf_state = old_buf_state;
buf_state -= BUF_REFCOUNT_ONE;
/* Decrement the shared reference count */ if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state,
Assert(buf->refcount > 0); buf_state))
buf->refcount--; break;
}
/* Support LockBufferForCleanup() */ /* Support LockBufferForCleanup() */
if ((buf->flags & BM_PIN_COUNT_WAITER) && if (buf_state & BM_PIN_COUNT_WAITER)
buf->refcount == 1)
{ {
/* we just released the last pin other than the waiter's */ /*
int wait_backend_pid = buf->wait_backend_pid; * Acquire the buffer header lock, re-check that there's a waiter.
* Another backend could have unpinned this buffer, and already
* woken up the waiter. There's no danger of the buffer being
* replaced after we unpinned it above, as it's pinned by the
* waiter.
*/
buf_state = LockBufHdr(buf);
buf->flags &= ~BM_PIN_COUNT_WAITER; if ((buf_state & BM_PIN_COUNT_WAITER) &&
UnlockBufHdr(buf); BUF_STATE_GET_REFCOUNT(buf_state) == 1)
ProcSendSignal(wait_backend_pid); {
} /* we just released the last pin other than the waiter's */
else int wait_backend_pid = buf->wait_backend_pid;
UnlockBufHdr(buf);
buf_state &= ~BM_PIN_COUNT_WAITER;
UnlockBufHdr(buf, buf_state);
ProcSendSignal(wait_backend_pid);
}
else
UnlockBufHdr(buf, buf_state);
}
ForgetPrivateRefCountEntry(ref); ForgetPrivateRefCountEntry(ref);
} }
} }
...@@ -1687,6 +1759,7 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) ...@@ -1687,6 +1759,7 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner)
static void static void
BufferSync(int flags) BufferSync(int flags)
{ {
uint32 buf_state;
int buf_id; int buf_id;
int num_to_scan; int num_to_scan;
int num_spaces; int num_spaces;
...@@ -1736,13 +1809,13 @@ BufferSync(int flags) ...@@ -1736,13 +1809,13 @@ BufferSync(int flags)
* Header spinlock is enough to examine BM_DIRTY, see comment in * Header spinlock is enough to examine BM_DIRTY, see comment in
* SyncOneBuffer. * SyncOneBuffer.
*/ */
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
if ((bufHdr->flags & mask) == mask) if ((buf_state & mask) == mask)
{ {
CkptSortItem *item; CkptSortItem *item;
bufHdr->flags |= BM_CHECKPOINT_NEEDED; buf_state |= BM_CHECKPOINT_NEEDED;
item = &CkptBufferIds[num_to_scan++]; item = &CkptBufferIds[num_to_scan++];
item->buf_id = buf_id; item->buf_id = buf_id;
...@@ -1752,7 +1825,7 @@ BufferSync(int flags) ...@@ -1752,7 +1825,7 @@ BufferSync(int flags)
item->blockNum = bufHdr->tag.blockNum; item->blockNum = bufHdr->tag.blockNum;
} }
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
} }
if (num_to_scan == 0) if (num_to_scan == 0)
...@@ -1888,7 +1961,7 @@ BufferSync(int flags) ...@@ -1888,7 +1961,7 @@ BufferSync(int flags)
* write the buffer though we didn't need to. It doesn't seem worth * write the buffer though we didn't need to. It doesn't seem worth
* guarding against this, though. * guarding against this, though.
*/ */
if (bufHdr->flags & BM_CHECKPOINT_NEEDED) if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
{ {
if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
{ {
...@@ -2176,8 +2249,8 @@ BgBufferSync(WritebackContext *wb_context) ...@@ -2176,8 +2249,8 @@ BgBufferSync(WritebackContext *wb_context)
/* Execute the LRU scan */ /* Execute the LRU scan */
while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
{ {
int buffer_state = SyncOneBuffer(next_to_clean, true, int sync_state = SyncOneBuffer(next_to_clean, true,
wb_context); wb_context);
if (++next_to_clean >= NBuffers) if (++next_to_clean >= NBuffers)
{ {
...@@ -2186,7 +2259,7 @@ BgBufferSync(WritebackContext *wb_context) ...@@ -2186,7 +2259,7 @@ BgBufferSync(WritebackContext *wb_context)
} }
num_to_scan--; num_to_scan--;
if (buffer_state & BUF_WRITTEN) if (sync_state & BUF_WRITTEN)
{ {
reusable_buffers++; reusable_buffers++;
if (++num_written >= bgwriter_lru_maxpages) if (++num_written >= bgwriter_lru_maxpages)
...@@ -2195,7 +2268,7 @@ BgBufferSync(WritebackContext *wb_context) ...@@ -2195,7 +2268,7 @@ BgBufferSync(WritebackContext *wb_context)
break; break;
} }
} }
else if (buffer_state & BUF_REUSABLE) else if (sync_state & BUF_REUSABLE)
reusable_buffers++; reusable_buffers++;
} }
...@@ -2258,6 +2331,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) ...@@ -2258,6 +2331,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
{ {
BufferDesc *bufHdr = GetBufferDescriptor(buf_id); BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
int result = 0; int result = 0;
uint32 buf_state;
BufferTag tag; BufferTag tag;
ReservePrivateRefCountEntry(); ReservePrivateRefCountEntry();
...@@ -2271,21 +2345,24 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) ...@@ -2271,21 +2345,24 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
* don't worry because our checkpoint.redo points before log record for * don't worry because our checkpoint.redo points before log record for
* upcoming changes and so we are not required to write such dirty buffer. * upcoming changes and so we are not required to write such dirty buffer.
*/ */
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
if (bufHdr->refcount == 0 && bufHdr->usage_count == 0) if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
{
result |= BUF_REUSABLE; result |= BUF_REUSABLE;
}
else if (skip_recently_used) else if (skip_recently_used)
{ {
/* Caller told us not to write recently-used buffers */ /* Caller told us not to write recently-used buffers */
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
return result; return result;
} }
if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY)) if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
{ {
/* It's clean, so nothing to do */ /* It's clean, so nothing to do */
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
return result; return result;
} }
...@@ -2439,6 +2516,7 @@ PrintBufferLeakWarning(Buffer buffer) ...@@ -2439,6 +2516,7 @@ PrintBufferLeakWarning(Buffer buffer)
int32 loccount; int32 loccount;
char *path; char *path;
BackendId backend; BackendId backend;
uint32 buf_state;
Assert(BufferIsValid(buffer)); Assert(BufferIsValid(buffer));
if (BufferIsLocal(buffer)) if (BufferIsLocal(buffer))
...@@ -2456,12 +2534,13 @@ PrintBufferLeakWarning(Buffer buffer) ...@@ -2456,12 +2534,13 @@ PrintBufferLeakWarning(Buffer buffer)
/* theoretically we should lock the bufhdr here */ /* theoretically we should lock the bufhdr here */
path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum);
buf_state = pg_atomic_read_u32(&buf->state);
elog(WARNING, elog(WARNING,
"buffer refcount leak: [%03d] " "buffer refcount leak: [%03d] "
"(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)",
buffer, path, buffer, path,
buf->tag.blockNum, buf->flags, buf->tag.blockNum, buf_state & BUF_FLAG_MASK,
buf->refcount, loccount); BUF_STATE_GET_REFCOUNT(buf_state), loccount);
pfree(path); pfree(path);
} }
...@@ -2573,6 +2652,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) ...@@ -2573,6 +2652,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
io_time; io_time;
Block bufBlock; Block bufBlock;
char *bufToWrite; char *bufToWrite;
uint32 buf_state;
/* /*
* Acquire the buffer's io_in_progress lock. If StartBufferIO returns * Acquire the buffer's io_in_progress lock. If StartBufferIO returns
...@@ -2598,7 +2678,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) ...@@ -2598,7 +2678,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.dbNode,
reln->smgr_rnode.node.relNode); reln->smgr_rnode.node.relNode);
LockBufHdr(buf); buf_state = LockBufHdr(buf);
/* /*
* Run PageGetLSN while holding header lock, since we don't have the * Run PageGetLSN while holding header lock, since we don't have the
...@@ -2607,8 +2687,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) ...@@ -2607,8 +2687,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
recptr = BufferGetLSN(buf); recptr = BufferGetLSN(buf);
/* To check if block content changes while flushing. - vadim 01/17/97 */ /* To check if block content changes while flushing. - vadim 01/17/97 */
buf->flags &= ~BM_JUST_DIRTIED; buf_state &= ~BM_JUST_DIRTIED;
UnlockBufHdr(buf); UnlockBufHdr(buf, buf_state);
/* /*
* Force XLOG flush up to buffer's LSN. This implements the basic WAL * Force XLOG flush up to buffer's LSN. This implements the basic WAL
...@@ -2627,7 +2707,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) ...@@ -2627,7 +2707,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln)
* disastrous system-wide consequences. To make sure that can't happen, * disastrous system-wide consequences. To make sure that can't happen,
* skip the flush if the buffer isn't permanent. * skip the flush if the buffer isn't permanent.
*/ */
if (buf->flags & BM_PERMANENT) if (buf_state & BM_PERMANENT)
XLogFlush(recptr); XLogFlush(recptr);
/* /*
...@@ -2716,12 +2796,12 @@ BufferIsPermanent(Buffer buffer) ...@@ -2716,12 +2796,12 @@ BufferIsPermanent(Buffer buffer)
/* /*
* BM_PERMANENT can't be changed while we hold a pin on the buffer, so we * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we
* need not bother with the buffer header spinlock. Even if someone else * need not bother with the buffer header spinlock. Even if someone else
* changes the buffer header flags while we're doing this, we assume that * changes the buffer header state while we're doing this, the state is
* changing an aligned 2-byte BufFlags value is atomic, so we'll read the * changed atomically, so we'll read the old value or the new value, but
* old value or the new value, but not random garbage. * not random garbage.
*/ */
bufHdr = GetBufferDescriptor(buffer - 1); bufHdr = GetBufferDescriptor(buffer - 1);
return (bufHdr->flags & BM_PERMANENT) != 0; return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0;
} }
/* /*
...@@ -2736,6 +2816,7 @@ BufferGetLSNAtomic(Buffer buffer) ...@@ -2736,6 +2816,7 @@ BufferGetLSNAtomic(Buffer buffer)
BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1); BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1);
char *page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); char *page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
XLogRecPtr lsn; XLogRecPtr lsn;
uint32 buf_state;
/* /*
* If we don't need locking for correctness, fastpath out. * If we don't need locking for correctness, fastpath out.
...@@ -2747,9 +2828,9 @@ BufferGetLSNAtomic(Buffer buffer) ...@@ -2747,9 +2828,9 @@ BufferGetLSNAtomic(Buffer buffer)
Assert(BufferIsValid(buffer)); Assert(BufferIsValid(buffer));
Assert(BufferIsPinned(buffer)); Assert(BufferIsPinned(buffer));
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
lsn = PageGetLSN(page); lsn = PageGetLSN(page);
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
return lsn; return lsn;
} }
...@@ -2797,6 +2878,7 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, ...@@ -2797,6 +2878,7 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
for (i = 0; i < NBuffers; i++) for (i = 0; i < NBuffers; i++)
{ {
BufferDesc *bufHdr = GetBufferDescriptor(i); BufferDesc *bufHdr = GetBufferDescriptor(i);
uint32 buf_state;
/* /*
* We can make this a tad faster by prechecking the buffer tag before * We can make this a tad faster by prechecking the buffer tag before
...@@ -2817,13 +2899,13 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, ...@@ -2817,13 +2899,13 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum,
if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node)) if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node))
continue; continue;
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) && if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) &&
bufHdr->tag.forkNum == forkNum && bufHdr->tag.forkNum == forkNum &&
bufHdr->tag.blockNum >= firstDelBlock) bufHdr->tag.blockNum >= firstDelBlock)
InvalidateBuffer(bufHdr); /* releases spinlock */ InvalidateBuffer(bufHdr); /* releases spinlock */
else else
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
} }
} }
...@@ -2887,6 +2969,7 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) ...@@ -2887,6 +2969,7 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
{ {
RelFileNode *rnode = NULL; RelFileNode *rnode = NULL;
BufferDesc *bufHdr = GetBufferDescriptor(i); BufferDesc *bufHdr = GetBufferDescriptor(i);
uint32 buf_state;
/* /*
* As in DropRelFileNodeBuffers, an unlocked precheck should be safe * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
...@@ -2917,11 +3000,11 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) ...@@ -2917,11 +3000,11 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes)
if (rnode == NULL) if (rnode == NULL)
continue; continue;
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode))) if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode)))
InvalidateBuffer(bufHdr); /* releases spinlock */ InvalidateBuffer(bufHdr); /* releases spinlock */
else else
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
} }
pfree(nodes); pfree(nodes);
...@@ -2951,6 +3034,7 @@ DropDatabaseBuffers(Oid dbid) ...@@ -2951,6 +3034,7 @@ DropDatabaseBuffers(Oid dbid)
for (i = 0; i < NBuffers; i++) for (i = 0; i < NBuffers; i++)
{ {
BufferDesc *bufHdr = GetBufferDescriptor(i); BufferDesc *bufHdr = GetBufferDescriptor(i);
uint32 buf_state;
/* /*
* As in DropRelFileNodeBuffers, an unlocked precheck should be safe * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
...@@ -2959,11 +3043,11 @@ DropDatabaseBuffers(Oid dbid) ...@@ -2959,11 +3043,11 @@ DropDatabaseBuffers(Oid dbid)
if (bufHdr->tag.rnode.dbNode != dbid) if (bufHdr->tag.rnode.dbNode != dbid)
continue; continue;
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
if (bufHdr->tag.rnode.dbNode == dbid) if (bufHdr->tag.rnode.dbNode == dbid)
InvalidateBuffer(bufHdr); /* releases spinlock */ InvalidateBuffer(bufHdr); /* releases spinlock */
else else
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
} }
} }
...@@ -3055,9 +3139,12 @@ FlushRelationBuffers(Relation rel) ...@@ -3055,9 +3139,12 @@ FlushRelationBuffers(Relation rel)
{ {
for (i = 0; i < NLocBuffer; i++) for (i = 0; i < NLocBuffer; i++)
{ {
uint32 buf_state;
bufHdr = GetLocalBufferDescriptor(i); bufHdr = GetLocalBufferDescriptor(i);
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) ((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
(BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
{ {
ErrorContextCallback errcallback; ErrorContextCallback errcallback;
Page localpage; Page localpage;
...@@ -3078,7 +3165,8 @@ FlushRelationBuffers(Relation rel) ...@@ -3078,7 +3165,8 @@ FlushRelationBuffers(Relation rel)
localpage, localpage,
false); false);
bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED);
pg_atomic_write_u32(&bufHdr->state, buf_state);
/* Pop the error context stack */ /* Pop the error context stack */
error_context_stack = errcallback.previous; error_context_stack = errcallback.previous;
...@@ -3093,6 +3181,8 @@ FlushRelationBuffers(Relation rel) ...@@ -3093,6 +3181,8 @@ FlushRelationBuffers(Relation rel)
for (i = 0; i < NBuffers; i++) for (i = 0; i < NBuffers; i++)
{ {
uint32 buf_state;
bufHdr = GetBufferDescriptor(i); bufHdr = GetBufferDescriptor(i);
/* /*
...@@ -3104,9 +3194,9 @@ FlushRelationBuffers(Relation rel) ...@@ -3104,9 +3194,9 @@ FlushRelationBuffers(Relation rel)
ReservePrivateRefCountEntry(); ReservePrivateRefCountEntry();
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
{ {
PinBuffer_Locked(bufHdr); PinBuffer_Locked(bufHdr);
LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
...@@ -3115,7 +3205,7 @@ FlushRelationBuffers(Relation rel) ...@@ -3115,7 +3205,7 @@ FlushRelationBuffers(Relation rel)
UnpinBuffer(bufHdr, true); UnpinBuffer(bufHdr, true);
} }
else else
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
} }
} }
...@@ -3145,6 +3235,8 @@ FlushDatabaseBuffers(Oid dbid) ...@@ -3145,6 +3235,8 @@ FlushDatabaseBuffers(Oid dbid)
for (i = 0; i < NBuffers; i++) for (i = 0; i < NBuffers; i++)
{ {
uint32 buf_state;
bufHdr = GetBufferDescriptor(i); bufHdr = GetBufferDescriptor(i);
/* /*
...@@ -3156,9 +3248,9 @@ FlushDatabaseBuffers(Oid dbid) ...@@ -3156,9 +3248,9 @@ FlushDatabaseBuffers(Oid dbid)
ReservePrivateRefCountEntry(); ReservePrivateRefCountEntry();
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
if (bufHdr->tag.rnode.dbNode == dbid && if (bufHdr->tag.rnode.dbNode == dbid &&
(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
{ {
PinBuffer_Locked(bufHdr); PinBuffer_Locked(bufHdr);
LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
...@@ -3167,7 +3259,7 @@ FlushDatabaseBuffers(Oid dbid) ...@@ -3167,7 +3259,7 @@ FlushDatabaseBuffers(Oid dbid)
UnpinBuffer(bufHdr, true); UnpinBuffer(bufHdr, true);
} }
else else
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
} }
} }
...@@ -3297,12 +3389,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) ...@@ -3297,12 +3389,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
* is only intended to be used in cases where failing to write out the * is only intended to be used in cases where failing to write out the
* data would be harmless anyway, it doesn't really matter. * data would be harmless anyway, it doesn't really matter.
*/ */
if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
(BM_DIRTY | BM_JUST_DIRTIED)) (BM_DIRTY | BM_JUST_DIRTIED))
{ {
XLogRecPtr lsn = InvalidXLogRecPtr; XLogRecPtr lsn = InvalidXLogRecPtr;
bool dirtied = false; bool dirtied = false;
bool delayChkpt = false; bool delayChkpt = false;
uint32 buf_state;
/* /*
* If we need to protect hint bit updates from torn writes, WAL-log a * If we need to protect hint bit updates from torn writes, WAL-log a
...@@ -3313,7 +3406,8 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) ...@@ -3313,7 +3406,8 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
* We don't check full_page_writes here because that logic is included * We don't check full_page_writes here because that logic is included
* when we call XLogInsert() since the value changes dynamically. * when we call XLogInsert() since the value changes dynamically.
*/ */
if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT)) if (XLogHintBitIsNeeded() &&
(pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT))
{ {
/* /*
* If we're in recovery we cannot dirty a page because of a hint. * If we're in recovery we cannot dirty a page because of a hint.
...@@ -3352,9 +3446,11 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) ...@@ -3352,9 +3446,11 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
lsn = XLogSaveBufferForHint(buffer, buffer_std); lsn = XLogSaveBufferForHint(buffer, buffer_std);
} }
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
Assert(bufHdr->refcount > 0);
if (!(bufHdr->flags & BM_DIRTY)) Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
if (!(buf_state & BM_DIRTY))
{ {
dirtied = true; /* Means "will be dirtied by this action" */ dirtied = true; /* Means "will be dirtied by this action" */
...@@ -3374,8 +3470,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) ...@@ -3374,8 +3470,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
if (!XLogRecPtrIsInvalid(lsn)) if (!XLogRecPtrIsInvalid(lsn))
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
} }
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
UnlockBufHdr(bufHdr); buf_state |= BM_DIRTY | BM_JUST_DIRTIED;
UnlockBufHdr(bufHdr, buf_state);
if (delayChkpt) if (delayChkpt)
MyPgXact->delayChkpt = false; MyPgXact->delayChkpt = false;
...@@ -3406,17 +3503,19 @@ UnlockBuffers(void) ...@@ -3406,17 +3503,19 @@ UnlockBuffers(void)
if (buf) if (buf)
{ {
LockBufHdr(buf); uint32 buf_state;
buf_state = LockBufHdr(buf);
/* /*
* Don't complain if flag bit not set; it could have been reset but we * Don't complain if flag bit not set; it could have been reset but we
* got a cancel/die interrupt before getting the signal. * got a cancel/die interrupt before getting the signal.
*/ */
if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
buf->wait_backend_pid == MyProcPid) buf->wait_backend_pid == MyProcPid)
buf->flags &= ~BM_PIN_COUNT_WAITER; buf_state &= ~BM_PIN_COUNT_WAITER;
UnlockBufHdr(buf); UnlockBufHdr(buf, buf_state);
PinCountWaitBuf = NULL; PinCountWaitBuf = NULL;
} }
...@@ -3509,27 +3608,30 @@ LockBufferForCleanup(Buffer buffer) ...@@ -3509,27 +3608,30 @@ LockBufferForCleanup(Buffer buffer)
for (;;) for (;;)
{ {
uint32 buf_state;
/* Try to acquire lock */ /* Try to acquire lock */
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
Assert(bufHdr->refcount > 0);
if (bufHdr->refcount == 1) Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
if (BUF_STATE_GET_REFCOUNT(buf_state) == 1)
{ {
/* Successfully acquired exclusive lock with pincount 1 */ /* Successfully acquired exclusive lock with pincount 1 */
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
return; return;
} }
/* Failed, so mark myself as waiting for pincount 1 */ /* Failed, so mark myself as waiting for pincount 1 */
if (bufHdr->flags & BM_PIN_COUNT_WAITER) if (buf_state & BM_PIN_COUNT_WAITER)
{ {
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
elog(ERROR, "multiple backends attempting to wait for pincount 1"); elog(ERROR, "multiple backends attempting to wait for pincount 1");
} }
bufHdr->wait_backend_pid = MyProcPid; bufHdr->wait_backend_pid = MyProcPid;
bufHdr->flags |= BM_PIN_COUNT_WAITER;
PinCountWaitBuf = bufHdr; PinCountWaitBuf = bufHdr;
UnlockBufHdr(bufHdr); buf_state |= BM_PIN_COUNT_WAITER;
UnlockBufHdr(bufHdr, buf_state);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* Report the wait */ /* Report the wait */
...@@ -3558,11 +3660,11 @@ LockBufferForCleanup(Buffer buffer) ...@@ -3558,11 +3660,11 @@ LockBufferForCleanup(Buffer buffer)
* impossible with the current usages due to table level locking, but * impossible with the current usages due to table level locking, but
* better be safe. * better be safe.
*/ */
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 && if ((buf_state & BM_PIN_COUNT_WAITER) != 0 &&
bufHdr->wait_backend_pid == MyProcPid) bufHdr->wait_backend_pid == MyProcPid)
bufHdr->flags &= ~BM_PIN_COUNT_WAITER; buf_state &= ~BM_PIN_COUNT_WAITER;
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
PinCountWaitBuf = NULL; PinCountWaitBuf = NULL;
/* Loop back and try again */ /* Loop back and try again */
...@@ -3603,22 +3705,26 @@ bool ...@@ -3603,22 +3705,26 @@ bool
ConditionalLockBufferForCleanup(Buffer buffer) ConditionalLockBufferForCleanup(Buffer buffer)
{ {
BufferDesc *bufHdr; BufferDesc *bufHdr;
uint32 buf_state,
refcount;
Assert(BufferIsValid(buffer)); Assert(BufferIsValid(buffer));
if (BufferIsLocal(buffer)) if (BufferIsLocal(buffer))
{ {
refcount = LocalRefCount[-buffer - 1];
/* There should be exactly one pin */ /* There should be exactly one pin */
Assert(LocalRefCount[-buffer - 1] > 0); Assert(refcount > 0);
if (LocalRefCount[-buffer - 1] != 1) if (refcount != 1)
return false; return false;
/* Nobody else to wait for */ /* Nobody else to wait for */
return true; return true;
} }
/* There should be exactly one local pin */ /* There should be exactly one local pin */
Assert(GetPrivateRefCount(buffer) > 0); refcount = GetPrivateRefCount(buffer);
if (GetPrivateRefCount(buffer) != 1) Assert(refcount);
if (refcount != 1)
return false; return false;
/* Try to acquire lock */ /* Try to acquire lock */
...@@ -3626,17 +3732,19 @@ ConditionalLockBufferForCleanup(Buffer buffer) ...@@ -3626,17 +3732,19 @@ ConditionalLockBufferForCleanup(Buffer buffer)
return false; return false;
bufHdr = GetBufferDescriptor(buffer - 1); bufHdr = GetBufferDescriptor(buffer - 1);
LockBufHdr(bufHdr); buf_state = LockBufHdr(bufHdr);
Assert(bufHdr->refcount > 0); refcount = BUF_STATE_GET_REFCOUNT(buf_state);
if (bufHdr->refcount == 1)
Assert(refcount > 0);
if (refcount == 1)
{ {
/* Successfully acquired exclusive lock with pincount 1 */ /* Successfully acquired exclusive lock with pincount 1 */
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
return true; return true;
} }
/* Failed, so release the lock */ /* Failed, so release the lock */
UnlockBufHdr(bufHdr); UnlockBufHdr(bufHdr, buf_state);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
return false; return false;
} }
...@@ -3666,17 +3774,17 @@ WaitIO(BufferDesc *buf) ...@@ -3666,17 +3774,17 @@ WaitIO(BufferDesc *buf)
*/ */
for (;;) for (;;)
{ {
BufFlags sv_flags; uint32 buf_state;
/* /*
* It may not be necessary to acquire the spinlock to check the flag * It may not be necessary to acquire the spinlock to check the flag
* here, but since this test is essential for correctness, we'd better * here, but since this test is essential for correctness, we'd better
* play it safe. * play it safe.
*/ */
LockBufHdr(buf); buf_state = LockBufHdr(buf);
sv_flags = buf->flags; UnlockBufHdr(buf, buf_state);
UnlockBufHdr(buf);
if (!(sv_flags & BM_IO_IN_PROGRESS)) if (!(buf_state & BM_IO_IN_PROGRESS))
break; break;
LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED); LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED);
LWLockRelease(BufferDescriptorGetIOLock(buf)); LWLockRelease(BufferDescriptorGetIOLock(buf));
...@@ -3704,6 +3812,8 @@ WaitIO(BufferDesc *buf) ...@@ -3704,6 +3812,8 @@ WaitIO(BufferDesc *buf)
static bool static bool
StartBufferIO(BufferDesc *buf, bool forInput) StartBufferIO(BufferDesc *buf, bool forInput)
{ {
uint32 buf_state;
Assert(!InProgressBuf); Assert(!InProgressBuf);
for (;;) for (;;)
...@@ -3714,9 +3824,9 @@ StartBufferIO(BufferDesc *buf, bool forInput) ...@@ -3714,9 +3824,9 @@ StartBufferIO(BufferDesc *buf, bool forInput)
*/ */
LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
LockBufHdr(buf); buf_state = LockBufHdr(buf);
if (!(buf->flags & BM_IO_IN_PROGRESS)) if (!(buf_state & BM_IO_IN_PROGRESS))
break; break;
/* /*
...@@ -3725,24 +3835,23 @@ StartBufferIO(BufferDesc *buf, bool forInput) ...@@ -3725,24 +3835,23 @@ StartBufferIO(BufferDesc *buf, bool forInput)
* an error (see AbortBufferIO). If that's the case, we must wait for * an error (see AbortBufferIO). If that's the case, we must wait for
* him to get unwedged. * him to get unwedged.
*/ */
UnlockBufHdr(buf); UnlockBufHdr(buf, buf_state);
LWLockRelease(BufferDescriptorGetIOLock(buf)); LWLockRelease(BufferDescriptorGetIOLock(buf));
WaitIO(buf); WaitIO(buf);
} }
/* Once we get here, there is definitely no I/O active on this buffer */ /* Once we get here, there is definitely no I/O active on this buffer */
if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{ {
/* someone else already did the I/O */ /* someone else already did the I/O */
UnlockBufHdr(buf); UnlockBufHdr(buf, buf_state);
LWLockRelease(BufferDescriptorGetIOLock(buf)); LWLockRelease(BufferDescriptorGetIOLock(buf));
return false; return false;
} }
buf->flags |= BM_IO_IN_PROGRESS; buf_state |= BM_IO_IN_PROGRESS;
UnlockBufHdr(buf, buf_state);
UnlockBufHdr(buf);
InProgressBuf = buf; InProgressBuf = buf;
IsForInput = forInput; IsForInput = forInput;
...@@ -3768,19 +3877,22 @@ StartBufferIO(BufferDesc *buf, bool forInput) ...@@ -3768,19 +3877,22 @@ StartBufferIO(BufferDesc *buf, bool forInput)
* be 0, or BM_VALID if we just finished reading in the page. * be 0, or BM_VALID if we just finished reading in the page.
*/ */
static void static void
TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits) TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
{ {
uint32 buf_state;
Assert(buf == InProgressBuf); Assert(buf == InProgressBuf);
LockBufHdr(buf); buf_state = LockBufHdr(buf);
Assert(buf_state & BM_IO_IN_PROGRESS);
Assert(buf->flags & BM_IO_IN_PROGRESS); buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR);
buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); if (clear_dirty && !(buf_state & BM_JUST_DIRTIED))
if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED);
buf->flags |= set_flag_bits;
UnlockBufHdr(buf); buf_state |= set_flag_bits;
UnlockBufHdr(buf, buf_state);
InProgressBuf = NULL; InProgressBuf = NULL;
...@@ -3803,6 +3915,8 @@ AbortBufferIO(void) ...@@ -3803,6 +3915,8 @@ AbortBufferIO(void)
if (buf) if (buf)
{ {
uint32 buf_state;
/* /*
* Since LWLockReleaseAll has already been called, we're not holding * Since LWLockReleaseAll has already been called, we're not holding
* the buffer's io_in_progress_lock. We have to re-acquire it so that * the buffer's io_in_progress_lock. We have to re-acquire it so that
...@@ -3811,24 +3925,22 @@ AbortBufferIO(void) ...@@ -3811,24 +3925,22 @@ AbortBufferIO(void)
*/ */
LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE);
LockBufHdr(buf); buf_state = LockBufHdr(buf);
Assert(buf->flags & BM_IO_IN_PROGRESS); Assert(buf_state & BM_IO_IN_PROGRESS);
if (IsForInput) if (IsForInput)
{ {
Assert(!(buf->flags & BM_DIRTY)); Assert(!(buf_state & BM_DIRTY));
/* We'd better not think buffer is valid yet */ /* We'd better not think buffer is valid yet */
Assert(!(buf->flags & BM_VALID)); Assert(!(buf_state & BM_VALID));
UnlockBufHdr(buf); UnlockBufHdr(buf, buf_state);
} }
else else
{ {
BufFlags sv_flags; Assert(buf_state & BM_DIRTY);
UnlockBufHdr(buf, buf_state);
sv_flags = buf->flags;
Assert(sv_flags & BM_DIRTY);
UnlockBufHdr(buf);
/* Issue notice if this is not the first failure... */ /* Issue notice if this is not the first failure... */
if (sv_flags & BM_IO_ERROR) if (buf_state & BM_IO_ERROR)
{ {
/* Buffer is pinned, so we can read tag without spinlock */ /* Buffer is pinned, so we can read tag without spinlock */
char *path; char *path;
...@@ -3911,6 +4023,54 @@ rnode_comparator(const void *p1, const void *p2) ...@@ -3911,6 +4023,54 @@ rnode_comparator(const void *p1, const void *p2)
return 0; return 0;
} }
/*
* Lock buffer header - set BM_LOCKED in buffer state.
*/
uint32
LockBufHdr(BufferDesc *desc)
{
SpinDelayStatus delayStatus = init_spin_delay(desc);
uint32 old_buf_state;
while (true)
{
/* set BM_LOCKED flag */
old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED);
/* if it wasn't set before we're OK */
if (!(old_buf_state & BM_LOCKED))
break;
perform_spin_delay(&delayStatus);
}
finish_spin_delay(&delayStatus);
return old_buf_state | BM_LOCKED;
}
/*
* Wait until the BM_LOCKED flag isn't set anymore and return the buffer's
* state at that point.
*
* Obviously the buffer could be locked by the time the value is returned, so
* this is primarily useful in CAS style loops.
*/
static uint32
WaitBufHdrUnlocked(BufferDesc *buf)
{
SpinDelayStatus delayStatus = init_spin_delay(buf);
uint32 buf_state;
buf_state = pg_atomic_read_u32(&buf->state);
while (buf_state & BM_LOCKED)
{
perform_spin_delay(&delayStatus);
buf_state = pg_atomic_read_u32(&buf->state);
}
finish_spin_delay(&delayStatus);
return buf_state;
}
/* /*
* BufferTag comparator. * BufferTag comparator.
*/ */
......
...@@ -98,7 +98,8 @@ typedef struct BufferAccessStrategyData ...@@ -98,7 +98,8 @@ typedef struct BufferAccessStrategyData
/* Prototypes for internal functions */ /* Prototypes for internal functions */
static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy); static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy,
uint32 *buf_state);
static void AddBufferToRing(BufferAccessStrategy strategy, static void AddBufferToRing(BufferAccessStrategy strategy,
BufferDesc *buf); BufferDesc *buf);
...@@ -180,11 +181,12 @@ ClockSweepTick(void) ...@@ -180,11 +181,12 @@ ClockSweepTick(void)
* return the buffer with the buffer header spinlock still held. * return the buffer with the buffer header spinlock still held.
*/ */
BufferDesc * BufferDesc *
StrategyGetBuffer(BufferAccessStrategy strategy) StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state)
{ {
BufferDesc *buf; BufferDesc *buf;
int bgwprocno; int bgwprocno;
int trycounter; int trycounter;
uint32 local_buf_state; /* to avoid repeated (de-)referencing */
/* /*
* If given a strategy object, see whether it can select a buffer. We * If given a strategy object, see whether it can select a buffer. We
...@@ -192,7 +194,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy) ...@@ -192,7 +194,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
*/ */
if (strategy != NULL) if (strategy != NULL)
{ {
buf = GetBufferFromRing(strategy); buf = GetBufferFromRing(strategy, buf_state);
if (buf != NULL) if (buf != NULL)
return buf; return buf;
} }
...@@ -279,14 +281,16 @@ StrategyGetBuffer(BufferAccessStrategy strategy) ...@@ -279,14 +281,16 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
* it before we got to it. It's probably impossible altogether as * it before we got to it. It's probably impossible altogether as
* of 8.3, but we'd better check anyway.) * of 8.3, but we'd better check anyway.)
*/ */
LockBufHdr(buf); local_buf_state = LockBufHdr(buf);
if (buf->refcount == 0 && buf->usage_count == 0) if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0
&& BUF_STATE_GET_USAGECOUNT(local_buf_state) == 0)
{ {
if (strategy != NULL) if (strategy != NULL)
AddBufferToRing(strategy, buf); AddBufferToRing(strategy, buf);
*buf_state = local_buf_state;
return buf; return buf;
} }
UnlockBufHdr(buf); UnlockBufHdr(buf, local_buf_state);
} }
} }
...@@ -295,19 +299,20 @@ StrategyGetBuffer(BufferAccessStrategy strategy) ...@@ -295,19 +299,20 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
trycounter = NBuffers; trycounter = NBuffers;
for (;;) for (;;)
{ {
buf = GetBufferDescriptor(ClockSweepTick()); buf = GetBufferDescriptor(ClockSweepTick());
/* /*
* If the buffer is pinned or has a nonzero usage_count, we cannot use * If the buffer is pinned or has a nonzero usage_count, we cannot use
* it; decrement the usage_count (unless pinned) and keep scanning. * it; decrement the usage_count (unless pinned) and keep scanning.
*/ */
LockBufHdr(buf); local_buf_state = LockBufHdr(buf);
if (buf->refcount == 0)
if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0)
{ {
if (buf->usage_count > 0) if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0)
{ {
buf->usage_count--; local_buf_state -= BUF_USAGECOUNT_ONE;
trycounter = NBuffers; trycounter = NBuffers;
} }
else else
...@@ -315,6 +320,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy) ...@@ -315,6 +320,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
/* Found a usable buffer */ /* Found a usable buffer */
if (strategy != NULL) if (strategy != NULL)
AddBufferToRing(strategy, buf); AddBufferToRing(strategy, buf);
*buf_state = local_buf_state;
return buf; return buf;
} }
} }
...@@ -327,10 +333,10 @@ StrategyGetBuffer(BufferAccessStrategy strategy) ...@@ -327,10 +333,10 @@ StrategyGetBuffer(BufferAccessStrategy strategy)
* probably better to fail than to risk getting stuck in an * probably better to fail than to risk getting stuck in an
* infinite loop. * infinite loop.
*/ */
UnlockBufHdr(buf); UnlockBufHdr(buf, local_buf_state);
elog(ERROR, "no unpinned buffers available"); elog(ERROR, "no unpinned buffers available");
} }
UnlockBufHdr(buf); UnlockBufHdr(buf, local_buf_state);
} }
} }
...@@ -585,10 +591,12 @@ FreeAccessStrategy(BufferAccessStrategy strategy) ...@@ -585,10 +591,12 @@ FreeAccessStrategy(BufferAccessStrategy strategy)
* The bufhdr spin lock is held on the returned buffer. * The bufhdr spin lock is held on the returned buffer.
*/ */
static BufferDesc * static BufferDesc *
GetBufferFromRing(BufferAccessStrategy strategy) GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
{ {
BufferDesc *buf; BufferDesc *buf;
Buffer bufnum; Buffer bufnum;
uint32 local_buf_state; /* to avoid repeated (de-)referencing */
/* Advance to next ring slot */ /* Advance to next ring slot */
if (++strategy->current >= strategy->ring_size) if (++strategy->current >= strategy->ring_size)
...@@ -616,13 +624,15 @@ GetBufferFromRing(BufferAccessStrategy strategy) ...@@ -616,13 +624,15 @@ GetBufferFromRing(BufferAccessStrategy strategy)
* shouldn't re-use it. * shouldn't re-use it.
*/ */
buf = GetBufferDescriptor(bufnum - 1); buf = GetBufferDescriptor(bufnum - 1);
LockBufHdr(buf); local_buf_state = LockBufHdr(buf);
if (buf->refcount == 0 && buf->usage_count <= 1) if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0
&& BUF_STATE_GET_USAGECOUNT(local_buf_state) <= 1)
{ {
strategy->current_was_in_ring = true; strategy->current_was_in_ring = true;
*buf_state = local_buf_state;
return buf; return buf;
} }
UnlockBufHdr(buf); UnlockBufHdr(buf, local_buf_state);
/* /*
* Tell caller to allocate a new buffer with the normal allocation * Tell caller to allocate a new buffer with the normal allocation
......
...@@ -108,6 +108,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, ...@@ -108,6 +108,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
int b; int b;
int trycounter; int trycounter;
bool found; bool found;
uint32 buf_state;
INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum);
...@@ -128,16 +129,21 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, ...@@ -128,16 +129,21 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1);
#endif #endif
buf_state = pg_atomic_read_u32(&bufHdr->state);
/* this part is equivalent to PinBuffer for a shared buffer */ /* this part is equivalent to PinBuffer for a shared buffer */
if (LocalRefCount[b] == 0) if (LocalRefCount[b] == 0)
{ {
if (bufHdr->usage_count < BM_MAX_USAGE_COUNT) if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT)
bufHdr->usage_count++; {
buf_state += BUF_USAGECOUNT_ONE;
pg_atomic_write_u32(&bufHdr->state, buf_state);
}
} }
LocalRefCount[b]++; LocalRefCount[b]++;
ResourceOwnerRememberBuffer(CurrentResourceOwner, ResourceOwnerRememberBuffer(CurrentResourceOwner,
BufferDescriptorGetBuffer(bufHdr)); BufferDescriptorGetBuffer(bufHdr));
if (bufHdr->flags & BM_VALID) if (buf_state & BM_VALID)
*foundPtr = TRUE; *foundPtr = TRUE;
else else
{ {
...@@ -169,9 +175,12 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, ...@@ -169,9 +175,12 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
if (LocalRefCount[b] == 0) if (LocalRefCount[b] == 0)
{ {
if (bufHdr->usage_count > 0) buf_state = pg_atomic_read_u32(&bufHdr->state);
if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0)
{ {
bufHdr->usage_count--; buf_state -= BUF_USAGECOUNT_ONE;
pg_atomic_write_u32(&bufHdr->state, buf_state);
trycounter = NLocBuffer; trycounter = NLocBuffer;
} }
else else
...@@ -193,7 +202,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, ...@@ -193,7 +202,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
* this buffer is not referenced but it might still be dirty. if that's * this buffer is not referenced but it might still be dirty. if that's
* the case, write it out before reusing it! * the case, write it out before reusing it!
*/ */
if (bufHdr->flags & BM_DIRTY) if (buf_state & BM_DIRTY)
{ {
SMgrRelation oreln; SMgrRelation oreln;
Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); Page localpage = (char *) LocalBufHdrGetBlock(bufHdr);
...@@ -211,7 +220,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, ...@@ -211,7 +220,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
false); false);
/* Mark not-dirty now in case we error out below */ /* Mark not-dirty now in case we error out below */
bufHdr->flags &= ~BM_DIRTY; buf_state &= ~BM_DIRTY;
pg_atomic_write_u32(&bufHdr->state, buf_state);
pgBufferUsage.local_blks_written++; pgBufferUsage.local_blks_written++;
} }
...@@ -228,7 +238,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, ...@@ -228,7 +238,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
/* /*
* Update the hash table: remove old entry, if any, and make new one. * Update the hash table: remove old entry, if any, and make new one.
*/ */
if (bufHdr->flags & BM_TAG_VALID) if (buf_state & BM_TAG_VALID)
{ {
hresult = (LocalBufferLookupEnt *) hresult = (LocalBufferLookupEnt *)
hash_search(LocalBufHash, (void *) &bufHdr->tag, hash_search(LocalBufHash, (void *) &bufHdr->tag,
...@@ -237,7 +247,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, ...@@ -237,7 +247,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
elog(ERROR, "local buffer hash table corrupted"); elog(ERROR, "local buffer hash table corrupted");
/* mark buffer invalid just in case hash insert fails */ /* mark buffer invalid just in case hash insert fails */
CLEAR_BUFFERTAG(bufHdr->tag); CLEAR_BUFFERTAG(bufHdr->tag);
bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID); buf_state &= ~(BM_VALID | BM_TAG_VALID);
pg_atomic_write_u32(&bufHdr->state, buf_state);
} }
hresult = (LocalBufferLookupEnt *) hresult = (LocalBufferLookupEnt *)
...@@ -250,9 +261,11 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, ...@@ -250,9 +261,11 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
* it's all ours now. * it's all ours now.
*/ */
bufHdr->tag = newTag; bufHdr->tag = newTag;
bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
bufHdr->flags |= BM_TAG_VALID; buf_state |= BM_TAG_VALID;
bufHdr->usage_count = 1; buf_state &= ~BUF_USAGECOUNT_MASK;
buf_state += BUF_USAGECOUNT_ONE;
pg_atomic_write_u32(&bufHdr->state, buf_state);
*foundPtr = FALSE; *foundPtr = FALSE;
return bufHdr; return bufHdr;
...@@ -267,6 +280,7 @@ MarkLocalBufferDirty(Buffer buffer) ...@@ -267,6 +280,7 @@ MarkLocalBufferDirty(Buffer buffer)
{ {
int bufid; int bufid;
BufferDesc *bufHdr; BufferDesc *bufHdr;
uint32 buf_state;
Assert(BufferIsLocal(buffer)); Assert(BufferIsLocal(buffer));
...@@ -280,10 +294,10 @@ MarkLocalBufferDirty(Buffer buffer) ...@@ -280,10 +294,10 @@ MarkLocalBufferDirty(Buffer buffer)
bufHdr = GetLocalBufferDescriptor(bufid); bufHdr = GetLocalBufferDescriptor(bufid);
if (!(bufHdr->flags & BM_DIRTY)) buf_state = pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY);
pgBufferUsage.local_blks_dirtied++;
bufHdr->flags |= BM_DIRTY; if (!(buf_state & BM_DIRTY))
pgBufferUsage.local_blks_dirtied++;
} }
/* /*
...@@ -307,8 +321,11 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, ...@@ -307,8 +321,11 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
{ {
BufferDesc *bufHdr = GetLocalBufferDescriptor(i); BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
LocalBufferLookupEnt *hresult; LocalBufferLookupEnt *hresult;
uint32 buf_state;
if ((bufHdr->flags & BM_TAG_VALID) && buf_state = pg_atomic_read_u32(&bufHdr->state);
if ((buf_state & BM_TAG_VALID) &&
RelFileNodeEquals(bufHdr->tag.rnode, rnode) && RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
bufHdr->tag.forkNum == forkNum && bufHdr->tag.forkNum == forkNum &&
bufHdr->tag.blockNum >= firstDelBlock) bufHdr->tag.blockNum >= firstDelBlock)
...@@ -327,8 +344,9 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, ...@@ -327,8 +344,9 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
elog(ERROR, "local buffer hash table corrupted"); elog(ERROR, "local buffer hash table corrupted");
/* Mark buffer invalid */ /* Mark buffer invalid */
CLEAR_BUFFERTAG(bufHdr->tag); CLEAR_BUFFERTAG(bufHdr->tag);
bufHdr->flags = 0; buf_state &= ~BUF_FLAG_MASK;
bufHdr->usage_count = 0; buf_state &= ~BUF_USAGECOUNT_MASK;
pg_atomic_write_u32(&bufHdr->state, buf_state);
} }
} }
} }
...@@ -349,8 +367,11 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode) ...@@ -349,8 +367,11 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode)
{ {
BufferDesc *bufHdr = GetLocalBufferDescriptor(i); BufferDesc *bufHdr = GetLocalBufferDescriptor(i);
LocalBufferLookupEnt *hresult; LocalBufferLookupEnt *hresult;
uint32 buf_state;
buf_state = pg_atomic_read_u32(&bufHdr->state);
if ((bufHdr->flags & BM_TAG_VALID) && if ((buf_state & BM_TAG_VALID) &&
RelFileNodeEquals(bufHdr->tag.rnode, rnode)) RelFileNodeEquals(bufHdr->tag.rnode, rnode))
{ {
if (LocalRefCount[i] != 0) if (LocalRefCount[i] != 0)
...@@ -367,8 +388,9 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode) ...@@ -367,8 +388,9 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode)
elog(ERROR, "local buffer hash table corrupted"); elog(ERROR, "local buffer hash table corrupted");
/* Mark buffer invalid */ /* Mark buffer invalid */
CLEAR_BUFFERTAG(bufHdr->tag); CLEAR_BUFFERTAG(bufHdr->tag);
bufHdr->flags = 0; buf_state &= ~BUF_FLAG_MASK;
bufHdr->usage_count = 0; buf_state &= ~BUF_USAGECOUNT_MASK;
pg_atomic_write_u32(&bufHdr->state, buf_state);
} }
} }
} }
......
...@@ -3,6 +3,38 @@ ...@@ -3,6 +3,38 @@
* s_lock.c * s_lock.c
* Hardware-dependent implementation of spinlocks. * Hardware-dependent implementation of spinlocks.
* *
* When waiting for a contended spinlock we loop tightly for awhile, then
* delay using pg_usleep() and try again. Preferably, "awhile" should be a
* small multiple of the maximum time we expect a spinlock to be held. 100
* iterations seems about right as an initial guess. However, on a
* uniprocessor the loop is a waste of cycles, while in a multi-CPU scenario
* it's usually better to spin a bit longer than to call the kernel, so we try
* to adapt the spin loop count depending on whether we seem to be in a
* uniprocessor or multiprocessor.
*
* Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd
* be wrong; there are platforms where that can result in a "stuck
* spinlock" failure. This has been seen particularly on Alphas; it seems
* that the first TAS after returning from kernel space will always fail
* on that hardware.
*
* Once we do decide to block, we use randomly increasing pg_usleep()
* delays. The first delay is 1 msec, then the delay randomly increases to
* about one second, after which we reset to 1 msec and start again. The
* idea here is that in the presence of heavy contention we need to
* increase the delay, else the spinlock holder may never get to run and
* release the lock. (Consider situation where spinlock holder has been
* nice'd down in priority by the scheduler --- it will not get scheduled
* until all would-be acquirers are sleeping, so if we always use a 1-msec
* sleep, there is a real possibility of starvation.) But we can't just
* clamp the delay to an upper bound, else it would take a long time to
* make a reasonable number of tries.
*
* We time out and declare error after NUM_DELAYS delays (thus, exactly
* that many tries). With the given settings, this will usually take 2 or
* so minutes. It seems better to fix the total number of tries (and thus
* the probability of unintended failure) than to fix the total time
* spent.
* *
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
...@@ -21,6 +53,14 @@ ...@@ -21,6 +53,14 @@
#include "storage/s_lock.h" #include "storage/s_lock.h"
#include "storage/barrier.h" #include "storage/barrier.h"
#define MIN_SPINS_PER_DELAY 10
#define MAX_SPINS_PER_DELAY 1000
#define NUM_DELAYS 1000
#define MIN_DELAY_USEC 1000L
#define MAX_DELAY_USEC 1000000L
slock_t dummy_spinlock; slock_t dummy_spinlock;
static int spins_per_delay = DEFAULT_SPINS_PER_DELAY; static int spins_per_delay = DEFAULT_SPINS_PER_DELAY;
...@@ -30,117 +70,107 @@ static int spins_per_delay = DEFAULT_SPINS_PER_DELAY; ...@@ -30,117 +70,107 @@ static int spins_per_delay = DEFAULT_SPINS_PER_DELAY;
* s_lock_stuck() - complain about a stuck spinlock * s_lock_stuck() - complain about a stuck spinlock
*/ */
static void static void
s_lock_stuck(volatile slock_t *lock, const char *file, int line) s_lock_stuck(void *p, const char *file, int line)
{ {
#if defined(S_LOCK_TEST) #if defined(S_LOCK_TEST)
fprintf(stderr, fprintf(stderr,
"\nStuck spinlock (%p) detected at %s:%d.\n", "\nStuck spinlock (%p) detected at %s:%d.\n",
lock, file, line); p, file, line);
exit(1); exit(1);
#else #else
elog(PANIC, "stuck spinlock (%p) detected at %s:%d", elog(PANIC, "stuck spinlock (%p) detected at %s:%d",
lock, file, line); p, file, line);
#endif #endif
} }
/* /*
* s_lock(lock) - platform-independent portion of waiting for a spinlock. * s_lock(lock) - platform-independent portion of waiting for a spinlock.
*/ */
int int
s_lock(volatile slock_t *lock, const char *file, int line) s_lock(volatile slock_t *lock, const char *file, int line)
{ {
/* SpinDelayStatus delayStatus = init_spin_delay((void *) lock);
* We loop tightly for awhile, then delay using pg_usleep() and try again.
* Preferably, "awhile" should be a small multiple of the maximum time we
* expect a spinlock to be held. 100 iterations seems about right as an
* initial guess. However, on a uniprocessor the loop is a waste of
* cycles, while in a multi-CPU scenario it's usually better to spin a bit
* longer than to call the kernel, so we try to adapt the spin loop count
* depending on whether we seem to be in a uniprocessor or multiprocessor.
*
* Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd
* be wrong; there are platforms where that can result in a "stuck
* spinlock" failure. This has been seen particularly on Alphas; it seems
* that the first TAS after returning from kernel space will always fail
* on that hardware.
*
* Once we do decide to block, we use randomly increasing pg_usleep()
* delays. The first delay is 1 msec, then the delay randomly increases to
* about one second, after which we reset to 1 msec and start again. The
* idea here is that in the presence of heavy contention we need to
* increase the delay, else the spinlock holder may never get to run and
* release the lock. (Consider situation where spinlock holder has been
* nice'd down in priority by the scheduler --- it will not get scheduled
* until all would-be acquirers are sleeping, so if we always use a 1-msec
* sleep, there is a real possibility of starvation.) But we can't just
* clamp the delay to an upper bound, else it would take a long time to
* make a reasonable number of tries.
*
* We time out and declare error after NUM_DELAYS delays (thus, exactly
* that many tries). With the given settings, this will usually take 2 or
* so minutes. It seems better to fix the total number of tries (and thus
* the probability of unintended failure) than to fix the total time
* spent.
*/
#define MIN_SPINS_PER_DELAY 10
#define MAX_SPINS_PER_DELAY 1000
#define NUM_DELAYS 1000
#define MIN_DELAY_USEC 1000L
#define MAX_DELAY_USEC 1000000L
int spins = 0;
int delays = 0;
int cur_delay = 0;
while (TAS_SPIN(lock)) while (TAS_SPIN(lock))
{ {
/* CPU-specific delay each time through the loop */ perform_spin_delay(&delayStatus);
SPIN_DELAY(); }
/* Block the process every spins_per_delay tries */ finish_spin_delay(&delayStatus);
if (++spins >= spins_per_delay)
{
if (++delays > NUM_DELAYS)
s_lock_stuck(lock, file, line);
if (cur_delay == 0) /* first time to delay? */ return delayStatus.delays;
cur_delay = MIN_DELAY_USEC; }
pg_usleep(cur_delay); #ifdef USE_DEFAULT_S_UNLOCK
void
s_unlock(volatile slock_t *lock)
{
#ifdef TAS_ACTIVE_WORD
/* HP's PA-RISC */
*TAS_ACTIVE_WORD(lock) = -1;
#else
*lock = 0;
#endif
}
#endif
/*
* Wait while spinning on a contended spinlock.
*/
void
perform_spin_delay(SpinDelayStatus *status)
{
/* CPU-specific delay each time through the loop */
SPIN_DELAY();
/* Block the process every spins_per_delay tries */
if (++(status->spins) >= spins_per_delay)
{
if (++(status->delays) > NUM_DELAYS)
s_lock_stuck(status->ptr, status->file, status->line);
if (status->cur_delay == 0) /* first time to delay? */
status->cur_delay = MIN_DELAY_USEC;
pg_usleep(status->cur_delay);
#if defined(S_LOCK_TEST) #if defined(S_LOCK_TEST)
fprintf(stdout, "*"); fprintf(stdout, "*");
fflush(stdout); fflush(stdout);
#endif #endif
/* increase delay by a random fraction between 1X and 2X */ /* increase delay by a random fraction between 1X and 2X */
cur_delay += (int) (cur_delay * status->cur_delay += (int) (status->cur_delay *
((double) random() / (double) MAX_RANDOM_VALUE) + 0.5); ((double) random() / (double) MAX_RANDOM_VALUE) + 0.5);
/* wrap back to minimum delay when max is exceeded */ /* wrap back to minimum delay when max is exceeded */
if (cur_delay > MAX_DELAY_USEC) if (status->cur_delay > MAX_DELAY_USEC)
cur_delay = MIN_DELAY_USEC; status->cur_delay = MIN_DELAY_USEC;
spins = 0; status->spins = 0;
}
} }
}
/* /*
* If we were able to acquire the lock without delaying, it's a good * After acquiring a spinlock, update estimates about how long to loop.
* indication we are in a multiprocessor. If we had to delay, it's a sign *
* (but not a sure thing) that we are in a uniprocessor. Hence, we * If we were able to acquire the lock without delaying, it's a good
* decrement spins_per_delay slowly when we had to delay, and increase it * indication we are in a multiprocessor. If we had to delay, it's a sign
* rapidly when we didn't. It's expected that spins_per_delay will * (but not a sure thing) that we are in a uniprocessor. Hence, we
* converge to the minimum value on a uniprocessor and to the maximum * decrement spins_per_delay slowly when we had to delay, and increase it
* value on a multiprocessor. * rapidly when we didn't. It's expected that spins_per_delay will
* * converge to the minimum value on a uniprocessor and to the maximum
* Note: spins_per_delay is local within our current process. We want to * value on a multiprocessor.
* average these observations across multiple backends, since it's *
* relatively rare for this function to even get entered, and so a single * Note: spins_per_delay is local within our current process. We want to
* backend might not live long enough to converge on a good value. That * average these observations across multiple backends, since it's
* is handled by the two routines below. * relatively rare for this function to even get entered, and so a single
*/ * backend might not live long enough to converge on a good value. That
if (cur_delay == 0) * is handled by the two routines below.
*/
void
finish_spin_delay(SpinDelayStatus *status)
{
if (status->cur_delay == 0)
{ {
/* we never had to delay */ /* we never had to delay */
if (spins_per_delay < MAX_SPINS_PER_DELAY) if (spins_per_delay < MAX_SPINS_PER_DELAY)
...@@ -151,22 +181,8 @@ s_lock(volatile slock_t *lock, const char *file, int line) ...@@ -151,22 +181,8 @@ s_lock(volatile slock_t *lock, const char *file, int line)
if (spins_per_delay > MIN_SPINS_PER_DELAY) if (spins_per_delay > MIN_SPINS_PER_DELAY)
spins_per_delay = Max(spins_per_delay - 1, MIN_SPINS_PER_DELAY); spins_per_delay = Max(spins_per_delay - 1, MIN_SPINS_PER_DELAY);
} }
return delays;
} }
#ifdef USE_DEFAULT_S_UNLOCK
void
s_unlock(volatile slock_t *lock)
{
#ifdef TAS_ACTIVE_WORD
/* HP's PA-RISC */
*TAS_ACTIVE_WORD(lock) = -1;
#else
*lock = 0;
#endif
}
#endif
/* /*
* Set local copy of spins_per_delay during backend startup. * Set local copy of spins_per_delay during backend startup.
* *
......
...@@ -63,12 +63,15 @@ extern void ShmemBackendArrayAllocation(void); ...@@ -63,12 +63,15 @@ extern void ShmemBackendArrayAllocation(void);
#endif #endif
/* /*
* Note: MAX_BACKENDS is limited to 2^23-1 because inval.c stores the * Note: MAX_BACKENDS is limited to 2^18-1 because that's the width reserved
* backend ID as a 3-byte signed integer. Even if that limitation were * for buffer references in buf_internals.h. This limitation could be lifted
* removed, we still could not exceed INT_MAX/4 because some places compute * by using a 64bit state; but it's unlikely to be worthwhile as 2^18-1
* 4*MaxBackends without any overflow check. This is rechecked in the relevant * backends exceed currently realistic configurations. Even if that limitation
* GUC check hooks and in RegisterBackgroundWorker(). * were removed, we still could not a) exceed 2^23-1 because inval.c stores
* the backend ID as a 3-byte signed integer, b) INT_MAX/4 because some places
* compute 4*MaxBackends without any overflow check. This is rechecked in the
* relevant GUC check hooks and in RegisterBackgroundWorker().
*/ */
#define MAX_BACKENDS 0x7fffff #define MAX_BACKENDS 0x3FFFF
#endif /* _POSTMASTER_H */ #endif /* _POSTMASTER_H */
...@@ -21,29 +21,51 @@ ...@@ -21,29 +21,51 @@
#include "storage/lwlock.h" #include "storage/lwlock.h"
#include "storage/shmem.h" #include "storage/shmem.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "port/atomics.h"
#include "storage/spin.h" #include "storage/spin.h"
#include "utils/relcache.h" #include "utils/relcache.h"
/*
* Buffer state is a single 32-bit variable where following data is combined.
*
* - 18 bits refcount
* - 4 bits usage count
* - 10 bits of flags
*
* Combining these values allows to perform some operations without locking
* the buffer header, by modifying them together with a CAS loop.
*
* The definition of buffer state components is below.
*/
#define BUF_REFCOUNT_ONE 1
#define BUF_REFCOUNT_MASK ((1U << 18) - 1)
#define BUF_USAGECOUNT_MASK 0x003C0000U
#define BUF_USAGECOUNT_ONE (1U << 18)
#define BUF_USAGECOUNT_SHIFT 18
#define BUF_FLAG_MASK 0xFFC00000U
/* Get refcount and usagecount from buffer state */
#define BUF_STATE_GET_REFCOUNT(state) ((state) & BUF_REFCOUNT_MASK)
#define BUF_STATE_GET_USAGECOUNT(state) (((state) & BUF_USAGECOUNT_MASK) >> BUF_USAGECOUNT_SHIFT)
/* /*
* Flags for buffer descriptors * Flags for buffer descriptors
* *
* Note: TAG_VALID essentially means that there is a buffer hashtable * Note: TAG_VALID essentially means that there is a buffer hashtable
* entry associated with the buffer's tag. * entry associated with the buffer's tag.
*/ */
#define BM_DIRTY (1 << 0) /* data needs writing */ #define BM_LOCKED (1U << 22) /* buffer header is locked */
#define BM_VALID (1 << 1) /* data is valid */ #define BM_DIRTY (1U << 23) /* data needs writing */
#define BM_TAG_VALID (1 << 2) /* tag is assigned */ #define BM_VALID (1U << 24) /* data is valid */
#define BM_IO_IN_PROGRESS (1 << 3) /* read or write in progress */ #define BM_TAG_VALID (1U << 25) /* tag is assigned */
#define BM_IO_ERROR (1 << 4) /* previous I/O failed */ #define BM_IO_IN_PROGRESS (1U << 26) /* read or write in progress */
#define BM_JUST_DIRTIED (1 << 5) /* dirtied since write started */ #define BM_IO_ERROR (1U << 27) /* previous I/O failed */
#define BM_PIN_COUNT_WAITER (1 << 6) /* have waiter for sole pin */ #define BM_JUST_DIRTIED (1U << 28) /* dirtied since write started */
#define BM_CHECKPOINT_NEEDED (1 << 7) /* must write for checkpoint */ #define BM_PIN_COUNT_WAITER (1U << 29) /* have waiter for sole pin */
#define BM_PERMANENT (1 << 8) /* permanent relation (not #define BM_CHECKPOINT_NEEDED (1U << 30) /* must write for checkpoint */
#define BM_PERMANENT (1U << 31) /* permanent relation (not
* unlogged) */ * unlogged) */
typedef bits16 BufFlags;
/* /*
* The maximum allowed value of usage_count represents a tradeoff between * The maximum allowed value of usage_count represents a tradeoff between
* accuracy and speed of the clock-sweep buffer management algorithm. A * accuracy and speed of the clock-sweep buffer management algorithm. A
...@@ -113,18 +135,29 @@ typedef struct buftag ...@@ -113,18 +135,29 @@ typedef struct buftag
/* /*
* BufferDesc -- shared descriptor/state data for a single shared buffer. * BufferDesc -- shared descriptor/state data for a single shared buffer.
* *
* Note: buf_hdr_lock must be held to examine or change the tag, flags, * Note: Buffer header lock (BM_LOCKED flag) must be held to examine or change
* usage_count, refcount, or wait_backend_pid fields. buf_id field never * the tag, state or wait_backend_pid fields. In general, buffer header lock
* changes after initialization, so does not need locking. freeNext is * is a spinlock which is combined with flags, refcount and usagecount into
* protected by the buffer_strategy_lock not buf_hdr_lock. The LWLock can * single atomic variable. This layout allow us to do some operations in a
* take care of itself. The buf_hdr_lock is *not* used to control access to * single atomic operation, without actually acquiring and releasing spinlock;
* the data in the buffer! * for instance, increase or decrease refcount. buf_id field never changes
* after initialization, so does not need locking. freeNext is protected by
* the buffer_strategy_lock not buffer header lock. The LWLock can take care
* of itself. The buffer header lock is *not* used to control access to the
* data in the buffer!
*
* It's assumed that nobody changes the state field while buffer header lock
* is held. Thus buffer header lock holder can do complex updates of the
* state variable in single write, simultaneously with lock release (cleaning
* BM_LOCKED flag). On the other hand, updating of state without holding
* buffer header lock is restricted to CAS, which insure that BM_LOCKED flag
* is not set. Atomic increment/decrement, OR/AND etc. are not allowed.
* *
* An exception is that if we have the buffer pinned, its tag can't change * An exception is that if we have the buffer pinned, its tag can't change
* underneath us, so we can examine the tag without locking the spinlock. * underneath us, so we can examine the tag without locking the buffer header.
* Also, in places we do one-time reads of the flags without bothering to * Also, in places we do one-time reads of the flags without bothering to
* lock the spinlock; this is generally for situations where we don't expect * lock the buffer header; this is generally for situations where we don't
* the flag bit being tested to be changing. * expect the flag bit being tested to be changing.
* *
* We can't physically remove items from a disk page if another backend has * We can't physically remove items from a disk page if another backend has
* the buffer pinned. Hence, a backend may need to wait for all other pins * the buffer pinned. Hence, a backend may need to wait for all other pins
...@@ -142,13 +175,12 @@ typedef struct buftag ...@@ -142,13 +175,12 @@ typedef struct buftag
typedef struct BufferDesc typedef struct BufferDesc
{ {
BufferTag tag; /* ID of page contained in buffer */ BufferTag tag; /* ID of page contained in buffer */
BufFlags flags; /* see bit definitions above */
uint8 usage_count; /* usage counter for clock sweep code */
slock_t buf_hdr_lock; /* protects a subset of fields, see above */
unsigned refcount; /* # of backends holding pins on buffer */
int wait_backend_pid; /* backend PID of pin-count waiter */
int buf_id; /* buffer's index number (from 0) */ int buf_id; /* buffer's index number (from 0) */
/* state of the tag, containing flags, refcount and usagecount */
pg_atomic_uint32 state;
int wait_backend_pid; /* backend PID of pin-count waiter */
int freeNext; /* link in freelist chain */ int freeNext; /* link in freelist chain */
LWLock content_lock; /* to lock access to buffer contents */ LWLock content_lock; /* to lock access to buffer contents */
...@@ -202,11 +234,15 @@ extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray; ...@@ -202,11 +234,15 @@ extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray;
#define FREENEXT_NOT_IN_LIST (-2) #define FREENEXT_NOT_IN_LIST (-2)
/* /*
* Macros for acquiring/releasing a shared buffer header's spinlock. * Functions for acquiring/releasing a shared buffer header's spinlock. Do
* Do not apply these to local buffers! * not apply these to local buffers!
*/ */
#define LockBufHdr(bufHdr) SpinLockAcquire(&(bufHdr)->buf_hdr_lock) extern uint32 LockBufHdr(BufferDesc *desc);
#define UnlockBufHdr(bufHdr) SpinLockRelease(&(bufHdr)->buf_hdr_lock) #define UnlockBufHdr(desc, s) \
do { \
pg_atomic_write_u32(&(desc)->state, (s) & (~BM_LOCKED)); \
pg_write_barrier(); \
} while (0)
/* /*
...@@ -267,7 +303,8 @@ extern void IssuePendingWritebacks(WritebackContext *context); ...@@ -267,7 +303,8 @@ extern void IssuePendingWritebacks(WritebackContext *context);
extern void ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag); extern void ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag);
/* freelist.c */ /* freelist.c */
extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
uint32 *buf_state);
extern void StrategyFreeBuffer(BufferDesc *buf); extern void StrategyFreeBuffer(BufferDesc *buf);
extern bool StrategyRejectBuffer(BufferAccessStrategy strategy, extern bool StrategyRejectBuffer(BufferAccessStrategy strategy,
BufferDesc *buf); BufferDesc *buf);
......
...@@ -991,4 +991,22 @@ extern int s_lock(volatile slock_t *lock, const char *file, int line); ...@@ -991,4 +991,22 @@ extern int s_lock(volatile slock_t *lock, const char *file, int line);
extern void set_spins_per_delay(int shared_spins_per_delay); extern void set_spins_per_delay(int shared_spins_per_delay);
extern int update_spins_per_delay(int shared_spins_per_delay); extern int update_spins_per_delay(int shared_spins_per_delay);
/*
* Support for spin delay which is useful in various places where
* spinlock-like procedures take place.
*/
typedef struct
{
int spins;
int delays;
int cur_delay;
void *ptr;
const char *file;
int line;
} SpinDelayStatus;
#define init_spin_delay(ptr) {0, 0, 0, (ptr), __FILE__, __LINE__}
void perform_spin_delay(SpinDelayStatus *status);
void finish_spin_delay(SpinDelayStatus *status);
#endif /* S_LOCK_H */ #endif /* S_LOCK_H */
...@@ -1859,6 +1859,7 @@ SpGistScanOpaqueData ...@@ -1859,6 +1859,7 @@ SpGistScanOpaqueData
SpGistState SpGistState
SpGistTypeDesc SpGistTypeDesc
SpecialJoinInfo SpecialJoinInfo
SpinDelayStatus
SplitInterval SplitInterval
SplitLR SplitLR
SplitVar SplitVar
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment