Commit 2d115e47 authored by Andres Freund's avatar Andres Freund

Fix various shortcomings of the new PrivateRefCount infrastructure.

As noted by Tom Lane the improvements in 4b4b680c had the problem
that in some situations we searched, entered and modified entries in
the private refcount hash while holding a spinlock. I had tried to
keep the logic entirely local to PinBuffer_Locked(), but that's not
really possible given it's called with a spinlock held...

Besides being disadvantageous from a performance point of view, this
also has problems with error handling safety. If we failed inserting
an entry into the hashtable due to an out of memory error, we'd error
out with a held spinlock. Not good.

Change the way private refcounts are manipulated: Before a buffer can
be tracked an entry has to be reserved using
ReservePrivateRefCountEntry(); then, if a entry is not found using
GetPrivateRefCountEntry(), it can be entered with
NewPrivateRefCountEntry().

Also take advantage of the fact that PinBuffer_Locked() currently is
never called for buffers that already have been pinned by the current
backend and don't search the private refcount entries for preexisting
local pins. That results in a small, but measurable, performance
improvement.

Additionally make ReleaseBuffer() always call UnpinBuffer() for shared
buffers. That avoids duplicating work in an eventual UnpinBuffer()
call that already has been done in ReleaseBuffer() and also saves some
code.

Per discussion with Tom Lane.

Discussion: 15028.1418772313@sss.pgh.pa.us
parent 4ea51cdf
......@@ -106,7 +106,7 @@ static volatile BufferDesc *PinCountWaitBuf = NULL;
*
*
* To avoid - as we used to - requiring an array with NBuffers entries to keep
* track of local buffers we use a small sequentially searched array
* track of local buffers, we use a small sequentially searched array
* (PrivateRefCountArray) and a overflow hash table (PrivateRefCountHash) to
* keep track of backend local pins.
*
......@@ -117,40 +117,129 @@ static volatile BufferDesc *PinCountWaitBuf = NULL;
*
* Note that in most scenarios the number of pinned buffers will not exceed
* REFCOUNT_ARRAY_ENTRIES.
*
*
* To enter a buffer into the refcount tracking mechanism first reserve a free
* entry using ReservePrivateRefCountEntry() and then later, if necessary,
* fill it with NewPrivateRefCountEntry(). That split lets us avoid doing
* memory allocations in NewPrivateRefCountEntry() which can be important
* because in some scenarios it's called with a spinlock held...
*/
static struct PrivateRefCountEntry PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES];
static HTAB *PrivateRefCountHash = NULL;
static int32 PrivateRefCountOverflowed = 0;
static uint32 PrivateRefCountClock = 0;
static PrivateRefCountEntry *ReservedRefCountEntry = NULL;
static PrivateRefCountEntry* GetPrivateRefCountEntry(Buffer buffer, bool create, bool do_move);
static void ReservePrivateRefCountEntry(void);
static PrivateRefCountEntry* NewPrivateRefCountEntry(Buffer buffer);
static PrivateRefCountEntry* GetPrivateRefCountEntry(Buffer buffer, bool do_move);
static inline int32 GetPrivateRefCount(Buffer buffer);
static void ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref);
/*
* Ensure that the the PrivateRefCountArray has sufficient space to store one
* more entry. This has to be called before using NewPrivateRefCountEntry() to
* fill a new entry - but it's perfectly fine to not use a reserved entry.
*/
static void
ReservePrivateRefCountEntry(void)
{
/* Already reserved (or freed), nothing to do */
if (ReservedRefCountEntry != NULL)
return;
/*
* First search for a free entry the array, that'll be sufficient in the
* majority of cases.
*/
{
int i;
for (i = 0; i < REFCOUNT_ARRAY_ENTRIES; i++)
{
PrivateRefCountEntry *res;
res = &PrivateRefCountArray[i];
if (res->buffer == InvalidBuffer)
{
ReservedRefCountEntry = res;
return;
}
}
}
/*
* No luck. All array entries are full. Move one array entry into the hash
* table.
*/
{
/*
* Move entry from the current clock position in the array into the
* hashtable. Use that slot.
*/
PrivateRefCountEntry *hashent;
bool found;
/* select victim slot */
ReservedRefCountEntry =
&PrivateRefCountArray[PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
/* Better be used, otherwise we shouldn't get here. */
Assert(ReservedRefCountEntry->buffer != InvalidBuffer);
/* enter victim array entry into hashtable */
hashent = hash_search(PrivateRefCountHash,
(void *) &(ReservedRefCountEntry->buffer),
HASH_ENTER,
&found);
Assert(!found);
hashent->refcount = ReservedRefCountEntry->refcount;
/* clear the now free array slot */
ReservedRefCountEntry->buffer = InvalidBuffer;
ReservedRefCountEntry->refcount = 0;
PrivateRefCountOverflowed++;
}
}
/*
* Fill a previously reserved refcount entry.
*/
static PrivateRefCountEntry*
NewPrivateRefCountEntry(Buffer buffer)
{
PrivateRefCountEntry *res;
/* only allowed to be called when a reservation has been made */
Assert(ReservedRefCountEntry != NULL);
/* use up the reserved entry */
res = ReservedRefCountEntry;
ReservedRefCountEntry = NULL;
/* and fill it */
res->buffer = buffer;
res->refcount = 0;
return res;
}
/*
* Return the PrivateRefCount entry for the passed buffer.
*
* Returns NULL if create = false is passed and the buffer doesn't have a
* PrivateRefCount entry; allocates a new PrivateRefCountEntry if currently
* none exists and create = true is passed.
*
* If do_move is true - only allowed for create = false - the entry is
* optimized for frequent access.
*
* When a returned refcount entry isn't used anymore it has to be forgotten,
* using ForgetPrivateRefCountEntry().
*
* Only works for shared buffers.
* Returns NULL if a buffer doesn't have a refcount entry. Otherwise, if
* do_move is true, and the entry resides in the hashtable the entry is
* optimized for frequent access by moving it to the array.
*/
static PrivateRefCountEntry*
GetPrivateRefCountEntry(Buffer buffer, bool create, bool do_move)
GetPrivateRefCountEntry(Buffer buffer, bool do_move)
{
PrivateRefCountEntry *res;
PrivateRefCountEntry *free = NULL;
bool found = false;
int i;
Assert(!create || do_move);
Assert(BufferIsValid(buffer));
Assert(!BufferIsLocal(buffer));
......@@ -164,144 +253,60 @@ GetPrivateRefCountEntry(Buffer buffer, bool create, bool do_move)
if (res->buffer == buffer)
return res;
/* Remember where to put a new refcount, should it become necessary. */
if (free == NULL && res->buffer == InvalidBuffer)
free = res;
}
/*
* By here we know that the buffer, if already pinned, isn't residing in
* the array.
*
* Only look up the buffer in the hashtable if we've previously overflowed
* into it.
*/
res = NULL;
found = false;
if (PrivateRefCountOverflowed == 0)
return NULL;
/*
* Look up the buffer in the hashtable if we've previously overflowed into
* it.
*/
if (PrivateRefCountOverflowed > 0)
res = hash_search(PrivateRefCountHash,
(void *) &buffer,
HASH_FIND,
NULL);
if (res == NULL)
return NULL;
else if (!do_move)
{
res = hash_search(PrivateRefCountHash,
(void *) &buffer,
HASH_FIND,
&found);
/* caller doesn't want us to move the hash entry into the array */
return res;
}
if (!found)
else
{
if (!create)
{
/* Neither array nor hash have an entry and no new entry is needed */
return NULL;
}
else if (free != NULL)
{
/* add entry into the free array slot */
free->buffer = buffer;
free->refcount = 0;
return free;
}
else
{
/*
* Move entry from the current clock position in the array into the
* hashtable. Use that slot.
*/
PrivateRefCountEntry *arrayent;
PrivateRefCountEntry *hashent;
/* select victim slot */
arrayent = &PrivateRefCountArray[
PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
Assert(arrayent->buffer != InvalidBuffer);
/* move buffer from hashtable into the free array slot */
bool found;
PrivateRefCountEntry *free;
/* enter victim array entry into hashtable */
hashent = hash_search(PrivateRefCountHash,
(void *) &arrayent->buffer,
HASH_ENTER,
&found);
Assert(!found);
hashent->refcount = arrayent->refcount;
/* Ensure there's a free array slot */
ReservePrivateRefCountEntry();
/* fill the now free array slot */
arrayent->buffer = buffer;
arrayent->refcount = 0;
/* Use up the reserved slot */
Assert(ReservedRefCountEntry != NULL);
free = ReservedRefCountEntry;
ReservedRefCountEntry = NULL;
Assert(free->buffer == InvalidBuffer);
PrivateRefCountOverflowed++;
/* and fill it */
free->buffer = buffer;
free->refcount = res->refcount;
return arrayent;
/* delete from hashtable */
hash_search(PrivateRefCountHash,
(void *) &buffer,
HASH_REMOVE,
&found);
Assert(found);
Assert(PrivateRefCountOverflowed > 0);
PrivateRefCountOverflowed--;
}
return free;
}
else
{
if (!do_move)
{
return res;
}
else if (found && free != NULL)
{
/* move buffer from hashtable into the free array slot */
/* fill array slot */
free->buffer = buffer;
free->refcount = res->refcount;
/* delete from hashtable */
hash_search(PrivateRefCountHash,
(void *) &buffer,
HASH_REMOVE,
&found);
Assert(found);
Assert(PrivateRefCountOverflowed > 0);
PrivateRefCountOverflowed--;
return free;
}
else
{
/*
* Swap the entry in the hash table with the one in the array at the
* current clock position.
*/
PrivateRefCountEntry *arrayent;
PrivateRefCountEntry *hashent;
/* select victim slot */
arrayent = &PrivateRefCountArray[
PrivateRefCountClock++ % REFCOUNT_ARRAY_ENTRIES];
Assert(arrayent->buffer != InvalidBuffer);
/* enter victim entry into the hashtable */
hashent = hash_search(PrivateRefCountHash,
(void *) &arrayent->buffer,
HASH_ENTER,
&found);
Assert(!found);
hashent->refcount = arrayent->refcount;
/* fill now free array entry with previously searched entry */
arrayent->buffer = res->buffer;
arrayent->refcount = res->refcount;
/* and remove the old entry */
hash_search(PrivateRefCountHash,
(void *) &arrayent->buffer,
HASH_REMOVE,
&found);
Assert(found);
/* PrivateRefCountOverflowed stays the same -1 + +1 = 0*/
return arrayent;
}
}
Assert(false); /* unreachable */
return NULL;
}
/*
......@@ -317,7 +322,11 @@ GetPrivateRefCount(Buffer buffer)
Assert(BufferIsValid(buffer));
Assert(!BufferIsLocal(buffer));
ref = GetPrivateRefCountEntry(buffer, false, false);
/*
* Not moving the entry - that's ok for the current users, but we might
* want to change this one day.
*/
ref = GetPrivateRefCountEntry(buffer, false);
if (ref == NULL)
return 0;
......@@ -337,6 +346,12 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
ref < &PrivateRefCountArray[REFCOUNT_ARRAY_ENTRIES])
{
ref->buffer = InvalidBuffer;
/*
* Mark the just used entry as reserved - in many scenarios that
* allows us to avoid ever having to search the array/hash for free
* entries.
*/
ReservedRefCountEntry = ref;
}
else
{
......@@ -923,6 +938,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
/* Loop here in case we have to try another victim buffer */
for (;;)
{
/*
* Ensure, while the spinlock's not yet held, that there's a free refcount
* entry.
*/
ReservePrivateRefCountEntry();
/*
* Select a victim buffer. The buffer is returned with its header
* spinlock still held!
......@@ -1407,10 +1428,13 @@ PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy)
bool result;
PrivateRefCountEntry *ref;
ref = GetPrivateRefCountEntry(b + 1, true, true);
ref = GetPrivateRefCountEntry(b + 1, true);
if (ref->refcount == 0)
if (ref == NULL)
{
ReservePrivateRefCountEntry();
ref = NewPrivateRefCountEntry(b + 1);
LockBufHdr(buf);
buf->refcount++;
if (strategy == NULL)
......@@ -1443,11 +1467,19 @@ PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy)
* PinBuffer_Locked -- as above, but caller already locked the buffer header.
* The spinlock is released before return.
*
* As this function is called with the spinlock held, the caller has to
* previously call ReservePrivateRefCountEntry().
*
* Currently, no callers of this function want to modify the buffer's
* usage_count at all, so there's no need for a strategy parameter.
* Also we don't bother with a BM_VALID test (the caller could check that for
* itself).
*
* Also all callers only ever use this function when it's known that the
* buffer can't have a preexisting pin by this backend. That allows us to skip
* searching the private refcount array & hash, which is a boon, because the
* spinlock is still held.
*
* Note: use of this routine is frequently mandatory, not just an optimization
* to save a spin lock/unlock cycle, because we need to pin a buffer before
* its state can change under us.
......@@ -1458,13 +1490,18 @@ PinBuffer_Locked(volatile BufferDesc *buf)
int b = buf->buf_id;
PrivateRefCountEntry *ref;
ref = GetPrivateRefCountEntry(b + 1, true, true);
/*
* As explained, We don't expect any preexisting pins. That allows us to
* manipulate the PrivateRefCount after releasing the spinlock
*/
Assert(GetPrivateRefCountEntry(b + 1, false) == NULL);
if (ref->refcount == 0)
buf->refcount++;
buf->refcount++;
UnlockBufHdr(buf);
ref = NewPrivateRefCountEntry(b + 1);
ref->refcount++;
Assert(ref->refcount > 0);
ResourceOwnerRememberBuffer(CurrentResourceOwner,
BufferDescriptorGetBuffer(buf));
}
......@@ -1481,9 +1518,9 @@ static void
UnpinBuffer(volatile BufferDesc *buf, bool fixOwner)
{
PrivateRefCountEntry *ref;
int b = buf->buf_id;
ref = GetPrivateRefCountEntry(b + 1, false, false);
/* not moving as we're likely deleting it soon anyway */
ref = GetPrivateRefCountEntry(buf->buf_id + 1, false);
Assert(ref != NULL);
if (fixOwner)
......@@ -1982,6 +2019,8 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
int result = 0;
ReservePrivateRefCountEntry();
/*
* Check whether buffer needs writing.
*
......@@ -2812,6 +2851,8 @@ FlushRelationBuffers(Relation rel)
if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
continue;
ReservePrivateRefCountEntry();
LockBufHdr(bufHdr);
if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
......@@ -2862,6 +2903,8 @@ FlushDatabaseBuffers(Oid dbid)
if (bufHdr->tag.rnode.dbNode != dbid)
continue;
ReservePrivateRefCountEntry();
LockBufHdr(bufHdr);
if (bufHdr->tag.rnode.dbNode == dbid &&
(bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
......@@ -2883,31 +2926,19 @@ FlushDatabaseBuffers(Oid dbid)
void
ReleaseBuffer(Buffer buffer)
{
volatile BufferDesc *bufHdr;
PrivateRefCountEntry *ref;
if (!BufferIsValid(buffer))
elog(ERROR, "bad buffer ID: %d", buffer);
ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
if (BufferIsLocal(buffer))
{
ResourceOwnerForgetBuffer(CurrentResourceOwner, buffer);
Assert(LocalRefCount[-buffer - 1] > 0);
LocalRefCount[-buffer - 1]--;
return;
}
bufHdr = &BufferDescriptors[buffer - 1];
ref = GetPrivateRefCountEntry(buffer, false, false);
Assert(ref != NULL);
Assert(ref->refcount > 0);
if (ref->refcount > 1)
ref->refcount--;
else
UnpinBuffer(bufHdr, false);
UnpinBuffer(&BufferDescriptors[buffer - 1], true);
}
/*
......@@ -2941,7 +2972,7 @@ IncrBufferRefCount(Buffer buffer)
else
{
PrivateRefCountEntry *ref;
ref = GetPrivateRefCountEntry(buffer, false, true);
ref = GetPrivateRefCountEntry(buffer, true);
Assert(ref != NULL);
ref->refcount++;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment