Commit eedb7d18 authored by Tom Lane's avatar Tom Lane

Modify RelationGetBufferForTuple() so that we only do lseek and lock

when we need to move to a new page; as long as we can insert the new
tuple on the same page as before, we only need LockBuffer and not the
expensive stuff.  Also, twiddle bufmgr interfaces to avoid redundant
lseeks in RelationGetBufferForTuple and BufferAlloc.  Successive inserts
now require one lseek per page added, rather than one per tuple with
several additional ones at each page boundary as happened before.
Lock contention when multiple backends are inserting in same table
is also greatly reduced.
parent d9f55edc
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.113 2001/03/25 23:23:58 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.114 2001/05/12 19:58:27 tgl Exp $
*
*
* INTERFACE ROUTINES
......@@ -487,7 +487,7 @@ heapgettup(Relation relation,
return;
}
*buffer = ReleaseAndReadBuffer(*buffer, relation, page);
*buffer = ReleaseAndReadBuffer(*buffer, relation, page, false);
if (!BufferIsValid(*buffer))
elog(ERROR, "heapgettup: failed ReadBuffer");
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Id: hio.c,v 1.37 2001/03/22 06:16:07 momjian Exp $
* $Id: hio.c,v 1.38 2001/05/12 19:58:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -66,7 +66,7 @@ RelationPutHeapTuple(Relation relation,
/*
* RelationGetBufferForTuple
*
* Returns (locked) buffer with free space >= given len.
* Returns exclusive-locked buffer with free space >= given len.
*
* Note that we use LockPage to lock relation for extension. We can
* do this as long as in all other places we use page-level locking
......@@ -75,14 +75,14 @@ RelationPutHeapTuple(Relation relation,
*
* ELOG(ERROR) is allowed here, so this routine *must* be called
* before any (unlogged) changes are made in buffer pool.
*
*/
Buffer
RelationGetBufferForTuple(Relation relation, Size len)
{
Buffer buffer;
Buffer buffer = InvalidBuffer;
Page pageHeader;
BlockNumber lastblock;
BlockNumber lastblock,
oldnblocks;
len = MAXALIGN(len); /* be conservative */
......@@ -93,59 +93,102 @@ RelationGetBufferForTuple(Relation relation, Size len)
elog(ERROR, "Tuple is too big: size %lu, max size %ld",
(unsigned long) len, MaxTupleSize);
if (!relation->rd_myxactonly)
LockPage(relation, 0, ExclusiveLock);
/*
* XXX This does an lseek - VERY expensive - but at the moment it is
* the only way to accurately determine how many blocks are in a
* relation. A good optimization would be to get this to actually
* work properly.
* First, use relcache's record of table length to guess where the
* last page is, and try to put the tuple there. This cached value
* may be out of date, in which case we'll be inserting into a non-last
* page, but that should be OK. Note that in a newly created relcache
* entry, rd_nblocks may be zero; if so, we'll set it correctly below.
*/
lastblock = RelationGetNumberOfBlocks(relation);
/*
* Get the last existing page --- may need to create the first one if
* this is a virgin relation.
*/
if (lastblock == 0)
{
buffer = ReadBuffer(relation, P_NEW);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pageHeader = (Page) BufferGetPage(buffer);
Assert(PageIsNew((PageHeader) pageHeader));
PageInit(pageHeader, BufferGetPageSize(buffer), 0);
}
else
if (relation->rd_nblocks > 0)
{
buffer = ReadBuffer(relation, lastblock - 1);
lastblock = relation->rd_nblocks - 1;
buffer = ReadBuffer(relation, lastblock);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pageHeader = (Page) BufferGetPage(buffer);
if (len <= PageGetFreeSpace(pageHeader))
return buffer;
/*
* Doesn't fit, so we'll have to try someplace else.
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* buffer release will happen below... */
}
/*
* Is there room on the last existing page?
* Before extending relation, make sure no one else has done
* so more recently than our last rd_nblocks update. (If we
* blindly extend the relation here, then probably most of the
* page the other guy added will end up going to waste.)
*
* We have to use a lock to ensure no one else is extending the
* rel at the same time, else we will both try to initialize the
* same new page.
*/
if (len > PageGetFreeSpace(pageHeader))
if (!relation->rd_myxactonly)
LockPage(relation, 0, ExclusiveLock);
oldnblocks = relation->rd_nblocks;
/*
* XXX This does an lseek - rather expensive - but at the moment it is
* the only way to accurately determine how many blocks are in a
* relation. Is it worth keeping an accurate file length in shared
* memory someplace, rather than relying on the kernel to do it for us?
*/
relation->rd_nblocks = RelationGetNumberOfBlocks(relation);
if (relation->rd_nblocks > oldnblocks)
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW);
/*
* Someone else has indeed extended the relation recently.
* Try to fit our tuple into the new last page.
*/
lastblock = relation->rd_nblocks - 1;
buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, false);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pageHeader = (Page) BufferGetPage(buffer);
Assert(PageIsNew((PageHeader) pageHeader));
PageInit(pageHeader, BufferGetPageSize(buffer), 0);
if (len > PageGetFreeSpace(pageHeader))
if (len <= PageGetFreeSpace(pageHeader))
{
/* We should not get here given the test at the top */
elog(STOP, "Tuple is too big: size %lu",
(unsigned long) len);
/* OK, we don't need to extend again. */
if (!relation->rd_myxactonly)
UnlockPage(relation, 0, ExclusiveLock);
return buffer;
}
/*
* Doesn't fit, so we'll have to extend the relation (again).
*/
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* buffer release will happen below... */
}
/*
* Extend the relation by one page and update rd_nblocks for next time.
*/
lastblock = relation->rd_nblocks;
buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, true);
relation->rd_nblocks = lastblock + 1;
/*
* We need to initialize the empty new page.
*/
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pageHeader = (Page) BufferGetPage(buffer);
Assert(PageIsNew((PageHeader) pageHeader));
PageInit(pageHeader, BufferGetPageSize(buffer), 0);
/*
* Release the file-extension lock; it's now OK for someone else
* to extend the relation some more.
*/
if (!relation->rd_myxactonly)
UnlockPage(relation, 0, ExclusiveLock);
return (buffer);
if (len > PageGetFreeSpace(pageHeader))
{
/* We should not get here given the test at the top */
elog(STOP, "Tuple is too big: size %lu",
(unsigned long) len);
}
return buffer;
}
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.110 2001/05/10 20:38:49 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.111 2001/05/12 19:58:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -88,10 +88,10 @@ extern void AbortBufferIO(void);
*/
#define BUFFER_IS_BROKEN(buf) ((buf->flags & BM_IO_ERROR) && !(buf->flags & BM_DIRTY))
static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
bool bufferLockHeld);
static Buffer ReadBufferInternal(Relation reln, BlockNumber blockNum,
bool isExtend, bool bufferLockHeld);
static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
bool *foundPtr, bool bufferLockHeld);
bool *foundPtr);
static int ReleaseBufferWithBufferLock(Buffer buffer);
static int BufferReplace(BufferDesc *bufHdr);
void PrintBufferDescs(void);
......@@ -121,7 +121,7 @@ RelationGetBufferWithBuffer(Relation relation,
SpinRelease(BufMgrLock);
return buffer;
}
return ReadBufferWithBufferLock(relation, blockNumber, true);
return ReadBufferInternal(relation, blockNumber, false, true);
}
else
{
......@@ -131,7 +131,7 @@ RelationGetBufferWithBuffer(Relation relation,
return buffer;
}
}
return ReadBuffer(relation, blockNumber);
return ReadBufferInternal(relation, blockNumber, false, false);
}
/*
......@@ -152,38 +152,44 @@ RelationGetBufferWithBuffer(Relation relation,
/*
* ReadBuffer
*
*/
Buffer
ReadBuffer(Relation reln, BlockNumber blockNum)
{
return ReadBufferWithBufferLock(reln, blockNum, false);
return ReadBufferInternal(reln, blockNum, false, false);
}
/*
* ReadBufferWithBufferLock -- does the work of
* ReadBuffer() but with the possibility that
* the buffer lock has already been held. this
* is yet another effort to reduce the number of
* semops in the system.
* ReadBufferInternal -- internal version of ReadBuffer with more options
*
* isExtend: if true, assume that we are extending the file and the caller
* is passing the current EOF block number (ie, caller already called
* smgrnblocks()).
*
* bufferLockHeld: if true, caller already acquired the bufmgr spinlock.
* (This is assumed never to be true if dealing with a local buffer!)
*/
static Buffer
ReadBufferWithBufferLock(Relation reln,
BlockNumber blockNum,
bool bufferLockHeld)
ReadBufferInternal(Relation reln, BlockNumber blockNum,
bool isExtend, bool bufferLockHeld)
{
BufferDesc *bufHdr;
int status;
bool found;
bool extend; /* extending the file by one block */
bool isLocalBuf;
extend = (blockNum == P_NEW);
isLocalBuf = reln->rd_myxactonly;
if (isLocalBuf)
{
ReadLocalBufferCount++;
/* Substitute proper block number if caller asked for P_NEW */
if (blockNum == P_NEW)
{
blockNum = reln->rd_nblocks;
reln->rd_nblocks++;
isExtend = true;
}
bufHdr = LocalBufferAlloc(reln, blockNum, &found);
if (found)
LocalBufferHitCount++;
......@@ -191,16 +197,25 @@ ReadBufferWithBufferLock(Relation reln,
else
{
ReadBufferCount++;
/* Substitute proper block number if caller asked for P_NEW */
if (blockNum == P_NEW)
{
blockNum = smgrnblocks(DEFAULT_SMGR, reln);
isExtend = true;
}
/*
* lookup the buffer. IO_IN_PROGRESS is set if the requested
* block is not currently in memory.
*/
bufHdr = BufferAlloc(reln, blockNum, &found, bufferLockHeld);
if (!bufferLockHeld)
SpinAcquire(BufMgrLock);
bufHdr = BufferAlloc(reln, blockNum, &found);
if (found)
BufferHitCount++;
}
/* At this point we do NOT hold the bufmgr spinlock. */
if (!bufHdr)
return InvalidBuffer;
......@@ -208,11 +223,11 @@ ReadBufferWithBufferLock(Relation reln,
if (found)
{
/*
* Could see found && extend if a buffer was already created for
* Could have found && isExtend if a buffer was already created for
* the next page position, but then smgrextend failed to write
* the page. Must fall through and try to extend file again.
*/
if (!extend)
if (!isExtend)
return BufferDescriptorGetBuffer(bufHdr);
}
......@@ -220,16 +235,16 @@ ReadBufferWithBufferLock(Relation reln,
* if we have gotten to this point, the reln pointer must be ok and
* the relation file must be open.
*/
if (extend)
if (isExtend)
{
/* new buffers are zero-filled */
MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
status = smgrextend(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
status = smgrextend(DEFAULT_SMGR, reln, blockNum,
(char *) MAKE_PTR(bufHdr->data));
}
else
{
status = smgrread(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
status = smgrread(DEFAULT_SMGR, reln, blockNum,
(char *) MAKE_PTR(bufHdr->data));
}
......@@ -290,13 +305,13 @@ ReadBufferWithBufferLock(Relation reln,
*
* Returns: descriptor for buffer
*
* When this routine returns, the BufMgrLock is guaranteed NOT be held.
* BufMgrLock must be held at entry. When this routine returns,
* the BufMgrLock is guaranteed NOT to be held.
*/
static BufferDesc *
BufferAlloc(Relation reln,
BlockNumber blockNum,
bool *foundPtr,
bool bufferLockHeld)
bool *foundPtr)
{
BufferDesc *buf,
*buf2;
......@@ -305,16 +320,8 @@ BufferAlloc(Relation reln,
/* create a new tag so we can lookup the buffer */
/* assume that the relation is already open */
if (blockNum == P_NEW)
{
blockNum = smgrnblocks(DEFAULT_SMGR, reln);
}
INIT_BUFFERTAG(&newTag, reln, blockNum);
if (!bufferLockHeld)
SpinAcquire(BufMgrLock);
/* see if the block is in the buffer pool already */
buf = BufTableLookup(&newTag);
if (buf != NULL)
......@@ -666,25 +673,34 @@ WriteNoReleaseBuffer(Buffer buffer)
#undef ReleaseAndReadBuffer
/*
* ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
* so that only one semop needs to be called.
* to save a spinlock release/acquire.
*
* An additional frammish of this routine is that the caller may perform
* file extension (as if blockNum = P_NEW) by passing the actual current
* EOF block number as blockNum and setting isExtend true. This hack
* allows us to avoid calling smgrnblocks() again when the caller has
* already done it.
*
* Note: it is OK to pass buffer = InvalidBuffer, indicating that no old
* buffer actually needs to be released. This case is the same as ReadBuffer
* except for the isExtend option.
*/
Buffer
ReleaseAndReadBuffer(Buffer buffer,
Relation relation,
BlockNumber blockNum)
BlockNumber blockNum,
bool isExtend)
{
BufferDesc *bufHdr;
Buffer retbuf;
if (BufferIsLocal(buffer))
{
Assert(LocalRefCount[-buffer - 1] > 0);
LocalRefCount[-buffer - 1]--;
}
else
if (BufferIsValid(buffer))
{
if (BufferIsValid(buffer))
if (BufferIsLocal(buffer))
{
Assert(LocalRefCount[-buffer - 1] > 0);
LocalRefCount[-buffer - 1]--;
}
else
{
bufHdr = &BufferDescriptors[buffer - 1];
Assert(PrivateRefCount[buffer - 1] > 0);
......@@ -701,13 +717,14 @@ ReleaseAndReadBuffer(Buffer buffer,
AddBufferToFreelist(bufHdr);
bufHdr->flags |= BM_FREE;
}
retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
return retbuf;
return ReadBufferInternal(relation, blockNum,
isExtend, true);
}
}
}
return ReadBuffer(relation, blockNum);
return ReadBufferInternal(relation, blockNum,
isExtend, false);
}
/*
......@@ -1735,13 +1752,14 @@ ReleaseAndReadBuffer_Debug(char *file,
int line,
Buffer buffer,
Relation relation,
BlockNumber blockNum)
BlockNumber blockNum,
bool isExtend)
{
bool bufferValid;
Buffer b;
bufferValid = BufferIsValid(buffer);
b = ReleaseAndReadBuffer(buffer, relation, blockNum);
b = ReleaseAndReadBuffer(buffer, relation, blockNum, isExtend);
if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
&& is_userbuffer(buffer))
{
......
......@@ -16,7 +16,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.40 2001/03/22 03:59:44 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.41 2001/05/12 19:58:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -54,12 +54,6 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
int i;
BufferDesc *bufHdr = (BufferDesc *) NULL;
if (blockNum == P_NEW)
{
blockNum = reln->rd_nblocks;
reln->rd_nblocks++;
}
/* a low tech search for now -- not optimized for scans */
for (i = 0; i < NLocBuffer; i++)
{
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: bufmgr.h,v 1.50 2001/03/22 04:01:05 momjian Exp $
* $Id: bufmgr.h,v 1.51 2001/05/12 19:58:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -164,7 +164,7 @@ extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
extern int WriteBuffer(Buffer buffer);
extern int WriteNoReleaseBuffer(Buffer buffer);
extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation,
BlockNumber blockNum);
BlockNumber blockNum, bool isExtend);
extern int FlushBuffer(Buffer buffer, bool sync, bool release);
extern void InitBufferPool(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment