Commit a8405cfc authored by Tom Lane's avatar Tom Lane

Acquire read lock on a buffer while writing it out, to prevent

concurrent modifications to the page by other backends.
parent 384d94ef
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.82 2000/06/08 22:37:20 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.83 2000/09/25 04:11:09 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -429,17 +429,10 @@ BufferAlloc(Relation reln, ...@@ -429,17 +429,10 @@ BufferAlloc(Relation reln,
inProgress = FALSE; inProgress = FALSE;
for (buf = (BufferDesc *) NULL; buf == (BufferDesc *) NULL;) for (buf = (BufferDesc *) NULL; buf == (BufferDesc *) NULL;)
{ {
/* GetFreeBuffer will abort if it can't find a free buffer */
buf = GetFreeBuffer(); buf = GetFreeBuffer();
/* /* GetFreeBuffer will abort if it can't find a free buffer */
* But it can return buf == NULL if we are in aborting transaction Assert(buf);
* now and so elog(ERROR,...) in GetFreeBuffer will not abort
* again.
*/
if (buf == NULL)
return NULL;
/* /*
* There should be exactly one pin on the buffer after it is * There should be exactly one pin on the buffer after it is
...@@ -789,11 +782,21 @@ FlushBuffer(Buffer buffer, bool release) ...@@ -789,11 +782,21 @@ FlushBuffer(Buffer buffer, bool release)
WaitIO(bufHdr, BufMgrLock); /* confirm end of IO */ WaitIO(bufHdr, BufMgrLock); /* confirm end of IO */
bufHdr->flags &= ~BM_JUST_DIRTIED; bufHdr->flags &= ~BM_JUST_DIRTIED;
StartBufferIO(bufHdr, false); /* output IO start */ StartBufferIO(bufHdr, false); /* output IO start */
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
/*
* Grab a read lock on the buffer to ensure that no
* other backend changes its contents while we write it;
* see comments in BufferSync().
*/
LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_SHARE);
status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum, status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data)); (char *) MAKE_PTR(bufHdr->data));
LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_UNLOCK);
/* drop relcache refcnt incremented by RelationIdCacheGetRelation */ /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
RelationDecrementReferenceCount(bufrel); RelationDecrementReferenceCount(bufrel);
...@@ -1017,19 +1020,6 @@ ClearBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr) ...@@ -1017,19 +1020,6 @@ ClearBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr)
* that have been dirtied by the current xact and flush them to disk. * that have been dirtied by the current xact and flush them to disk.
* We do *not* flush dirty buffers that have been dirtied by other xacts. * We do *not* flush dirty buffers that have been dirtied by other xacts.
* (This is a substantial change from pre-7.0 behavior.) * (This is a substantial change from pre-7.0 behavior.)
*
* OLD COMMENTS (do these still apply?)
*
* Also, we need to be sure that no other transaction is
* modifying the page as we flush it. This is only a problem for objects
* that use a non-two-phase locking protocol, like btree indices. For
* those objects, we would like to set a write lock for the duration of
* our IO. Another possibility is to code updates to btree pages
* carefully, so that writing them out out of order cannot cause
* any unrecoverable errors.
*
* I don't want to think hard about this right now, so I will try
* to come back to it later.
*/ */
static void static void
BufferSync() BufferSync()
...@@ -1112,15 +1102,28 @@ BufferSync() ...@@ -1112,15 +1102,28 @@ BufferSync()
bufHdr->flags &= ~BM_JUST_DIRTIED; bufHdr->flags &= ~BM_JUST_DIRTIED;
StartBufferIO(bufHdr, false); /* output IO start */ StartBufferIO(bufHdr, false); /* output IO start */
SpinRelease(BufMgrLock);
/*
* Grab a read lock on the buffer to ensure that no
* other backend changes its contents while we write it;
* otherwise we could write a non-self-consistent page
* image to disk, which'd be bad news if the other
* transaction aborts before writing its changes.
*
* Note that we still need the BM_JUST_DIRTIED mechanism
* in case someone dirties the buffer just before we
* grab this lock or just after we release it.
*/
LockBuffer(BufferDescriptorGetBuffer(bufHdr),
BUFFER_LOCK_SHARE);
/* /*
* If we didn't have the reldesc in our local cache, * If we didn't have the reldesc in our local cache,
* write this page out using the 'blind write' storage * write this page out using the 'blind write' storage
* manager routine. If we did find it, use the * manager routine. If we did find it, use the
* standard interface. * standard interface.
*/ */
#ifndef OPTIMIZE_SINGLE
SpinRelease(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
if (reln == (Relation) NULL) if (reln == (Relation) NULL)
{ {
status = smgrblindwrt(DEFAULT_SMGR, status = smgrblindwrt(DEFAULT_SMGR,
...@@ -1137,9 +1140,14 @@ BufferSync() ...@@ -1137,9 +1140,14 @@ BufferSync()
bufHdr->tag.blockNum, bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data)); (char *) MAKE_PTR(bufHdr->data));
} }
#ifndef OPTIMIZE_SINGLE
/*
* Release the per-buffer readlock, reacquire BufMgrLock.
*/
LockBuffer(BufferDescriptorGetBuffer(bufHdr),
BUFFER_LOCK_UNLOCK);
SpinAcquire(BufMgrLock); SpinAcquire(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
UnpinBuffer(bufHdr); UnpinBuffer(bufHdr);
if (status == SM_FAIL) if (status == SM_FAIL)
...@@ -1522,9 +1530,14 @@ BufferReplace(BufferDesc *bufHdr) ...@@ -1522,9 +1530,14 @@ BufferReplace(BufferDesc *bufHdr)
/* To check if block content changed while flushing. - vadim 01/17/97 */ /* To check if block content changed while flushing. - vadim 01/17/97 */
bufHdr->flags &= ~BM_JUST_DIRTIED; bufHdr->flags &= ~BM_JUST_DIRTIED;
#ifndef OPTIMIZE_SINGLE
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
/*
* Grab a read lock on the buffer to ensure that no
* other backend changes its contents while we write it;
* see comments in BufferSync().
*/
LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_SHARE);
if (reln != (Relation) NULL) if (reln != (Relation) NULL)
{ {
...@@ -1540,9 +1553,9 @@ BufferReplace(BufferDesc *bufHdr) ...@@ -1540,9 +1553,9 @@ BufferReplace(BufferDesc *bufHdr)
false); /* no fsync */ false); /* no fsync */
} }
#ifndef OPTIMIZE_SINGLE LockBuffer(BufferDescriptorGetBuffer(bufHdr), BUFFER_LOCK_UNLOCK);
SpinAcquire(BufMgrLock); SpinAcquire(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
/* drop relcache refcnt incremented by RelationIdCacheGetRelation */ /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
if (reln != (Relation) NULL) if (reln != (Relation) NULL)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment