Commit 1f6d8b90 authored by Tom Lane's avatar Tom Lane

Buffer manager modifications to keep a local buffer-dirtied bit as well

as a shared dirtybit for each shared buffer.  The shared dirtybit still
controls writing the buffer, but the local bit controls whether we need
to fsync the buffer's file.  This arrangement fixes a bug that allowed
some required fsyncs to be missed, and should improve performance as well.
For more info see my post of same date on pghackers.
parent 9c38a8d2
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.62 2000/03/17 02:36:05 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.63 2000/04/09 04:43:16 tgl Exp $
* *
* NOTES * NOTES
* Transaction aborts can now occur two ways: * Transaction aborts can now occur two ways:
...@@ -642,7 +642,7 @@ RecordTransactionCommit() ...@@ -642,7 +642,7 @@ RecordTransactionCommit()
{ {
FlushBufferPool(); FlushBufferPool();
if (leak) if (leak)
ResetBufferPool(); ResetBufferPool(true);
/* /*
* have the transaction access methods record the status * have the transaction access methods record the status
...@@ -658,7 +658,7 @@ RecordTransactionCommit() ...@@ -658,7 +658,7 @@ RecordTransactionCommit()
} }
if (leak) if (leak)
ResetBufferPool(); ResetBufferPool(true);
} }
...@@ -759,7 +759,10 @@ RecordTransactionAbort() ...@@ -759,7 +759,10 @@ RecordTransactionAbort()
if (SharedBufferChanged && !TransactionIdDidCommit(xid)) if (SharedBufferChanged && !TransactionIdDidCommit(xid))
TransactionIdAbort(xid); TransactionIdAbort(xid);
ResetBufferPool(); /*
* Tell bufmgr and smgr to release resources.
*/
ResetBufferPool(false); /* false -> is abort */
} }
/* -------------------------------- /* --------------------------------
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/catalog/catalog.c,v 1.30 2000/01/26 05:56:10 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/catalog/catalog.c,v 1.31 2000/04/09 04:43:15 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -23,25 +23,87 @@ ...@@ -23,25 +23,87 @@
#include "utils/syscache.h" #include "utils/syscache.h"
/* /*
* relpath - path to the relation * relpath - construct path to a relation's file
* Perhaps this should be in-line code in relopen(). *
* Note that this only works with relations that are visible to the current
* backend, ie, either in the current database or shared system relations.
*
* Result is a palloc'd string.
*/ */
char * char *
relpath(const char *relname) relpath(const char *relname)
{ {
char *path; char *path;
size_t bufsize = 0;
if (IsSharedSystemRelationName(relname)) if (IsSharedSystemRelationName(relname))
{ {
bufsize = strlen(DataDir) + sizeof(NameData) + 2; /* Shared system relations live in DataDir */
size_t bufsize = strlen(DataDir) + sizeof(NameData) + 2;
path = (char *) palloc(bufsize); path = (char *) palloc(bufsize);
snprintf(path, bufsize, "%s/%s", DataDir, relname); snprintf(path, bufsize, "%s%c%s", DataDir, SEP_CHAR, relname);
return path; return path;
} }
/*
* If it is in the current database, assume it is in current working
* directory. NB: this does not work during bootstrap!
*/
return pstrdup(relname); return pstrdup(relname);
} }
/*
* relpath_blind - construct path to a relation's file
*
* Construct the path using only the info available to smgrblindwrt,
* namely the names and OIDs of the database and relation. (Shared system
* relations are identified with dbid = 0.) Note that we may have to
* access a relation belonging to a different database!
*
* Result is a palloc'd string.
*/
char *
relpath_blind(const char *dbname, const char *relname,
Oid dbid, Oid relid)
{
char *path;
if (dbid == (Oid) 0)
{
/* Shared system relations live in DataDir */
path = (char *) palloc(strlen(DataDir) + sizeof(NameData) + 2);
sprintf(path, "%s%c%s", DataDir, SEP_CHAR, relname);
}
else if (dbid == MyDatabaseId)
{
/* XXX why is this inconsistent with relpath() ? */
path = (char *) palloc(strlen(DatabasePath) + sizeof(NameData) + 2);
sprintf(path, "%s%c%s", DatabasePath, SEP_CHAR, relname);
}
else
{
/* this is work around only !!! */
char dbpathtmp[MAXPGPATH];
Oid id;
char *dbpath;
GetRawDatabaseInfo(dbname, &id, dbpathtmp);
if (id != dbid)
elog(FATAL, "relpath_blind: oid of db %s is not %u",
dbname, dbid);
dbpath = ExpandDatabasePath(dbpathtmp);
if (dbpath == NULL)
elog(FATAL, "relpath_blind: can't expand path for db %s",
dbname);
path = (char *) palloc(strlen(dbpath) + sizeof(NameData) + 2);
sprintf(path, "%s%c%s", dbpath, SEP_CHAR, relname);
pfree(dbpath);
}
return path;
}
/* /*
* IsSystemRelationName * IsSystemRelationName
* True iff name is the name of a system catalog relation. * True iff name is the name of a system catalog relation.
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/buf_init.c,v 1.32 2000/01/26 05:56:50 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/buf_init.c,v 1.33 2000/04/09 04:43:18 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -65,9 +65,11 @@ long *NWaitIOBackendP; ...@@ -65,9 +65,11 @@ long *NWaitIOBackendP;
extern IpcSemaphoreId WaitIOSemId; extern IpcSemaphoreId WaitIOSemId;
long *PrivateRefCount; /* also used in freelist.c */ long *PrivateRefCount; /* also used in freelist.c */
bits8 *BufferLocks; /* */ bits8 *BufferLocks; /* flag bits showing locks I have set */
long *CommitInfoNeedsSave;/* to write buffers where we have filled BufferTag *BufferTagLastDirtied; /* tag buffer had when last dirtied by me */
* in t_infomask */ BufferBlindId *BufferBlindLastDirtied; /* and its BlindId too */
bool *BufferDirtiedByMe; /* T if buf has been dirtied in cur xact */
/* /*
* Data Structures: * Data Structures:
...@@ -247,7 +249,9 @@ InitBufferPool(IPCKey key) ...@@ -247,7 +249,9 @@ InitBufferPool(IPCKey key)
#endif #endif
PrivateRefCount = (long *) calloc(NBuffers, sizeof(long)); PrivateRefCount = (long *) calloc(NBuffers, sizeof(long));
BufferLocks = (bits8 *) calloc(NBuffers, sizeof(bits8)); BufferLocks = (bits8 *) calloc(NBuffers, sizeof(bits8));
CommitInfoNeedsSave = (long *) calloc(NBuffers, sizeof(long)); BufferTagLastDirtied = (BufferTag *) calloc(NBuffers, sizeof(BufferTag));
BufferBlindLastDirtied = (BufferBlindId *) calloc(NBuffers, sizeof(BufferBlindId));
BufferDirtiedByMe = (bool *) calloc(NBuffers, sizeof(bool));
} }
/* ----------------------------------------------------- /* -----------------------------------------------------
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.77 2000/03/31 02:43:31 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.78 2000/04/09 04:43:18 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -94,8 +94,10 @@ static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum, ...@@ -94,8 +94,10 @@ static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
bool bufferLockHeld); bool bufferLockHeld);
static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum, static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
bool *foundPtr, bool bufferLockHeld); bool *foundPtr, bool bufferLockHeld);
static void SetBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr);
static void ClearBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr);
static void BufferSync(void); static void BufferSync(void);
static int BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld); static int BufferReplace(BufferDesc *bufHdr);
void PrintBufferDescs(void); void PrintBufferDescs(void);
/* --------------------------------------------------- /* ---------------------------------------------------
...@@ -176,7 +178,7 @@ is_userbuffer(Buffer buffer) ...@@ -176,7 +178,7 @@ is_userbuffer(Buffer buffer)
{ {
BufferDesc *buf = &BufferDescriptors[buffer - 1]; BufferDesc *buf = &BufferDescriptors[buffer - 1];
if (IsSystemRelationName(buf->sb_relname)) if (IsSystemRelationName(buf->blind.relname))
return false; return false;
return true; return true;
} }
...@@ -199,7 +201,7 @@ ReadBuffer_Debug(char *file, ...@@ -199,7 +201,7 @@ ReadBuffer_Debug(char *file,
fprintf(stderr, "PIN(RD) %ld relname = %s, blockNum = %d, \ fprintf(stderr, "PIN(RD) %ld relname = %s, blockNum = %d, \
refcount = %ld, file: %s, line: %d\n", refcount = %ld, file: %s, line: %d\n",
buffer, buf->sb_relname, buf->tag.blockNum, buffer, buf->blind.relname, buf->tag.blockNum,
PrivateRefCount[buffer - 1], file, line); PrivateRefCount[buffer - 1], file, line);
} }
return buffer; return buffer;
...@@ -390,22 +392,21 @@ BufferAlloc(Relation reln, ...@@ -390,22 +392,21 @@ BufferAlloc(Relation reln,
* If there's no IO for the buffer and the buffer * If there's no IO for the buffer and the buffer
* is BROKEN,it should be read again. So start a * is BROKEN,it should be read again. So start a
* new buffer IO here. * new buffer IO here.
*
* * wierd race condition:
* wierd race condition: *
* * We were waiting for someone else to read the buffer. While
* We were waiting for someone else to read the buffer. While * we were waiting, the reader boof'd in some way, so the
* we were waiting, the reader boof'd in some way, so the * contents of the buffer are still invalid. By saying
* contents of the buffer are still invalid. By saying * that we didn't find it, we can make the caller
* that we didn't find it, we can make the caller * reinitialize the buffer. If two processes are waiting
* reinitialize the buffer. If two processes are waiting * for this block, both will read the block. The second
* for this block, both will read the block. The second * one to finish may overwrite any updates made by the
* one to finish may overwrite any updates made by the * first. (Assume higher level synchronization prevents
* first. (Assume higher level synchronization prevents * this from happening).
* this from happening). *
* * This is never going to happen, don't worry about it.
* This is never going to happen, don't worry about it. */
*/
*foundPtr = FALSE; *foundPtr = FALSE;
} }
#ifdef BMTRACE #ifdef BMTRACE
...@@ -465,33 +466,24 @@ BufferAlloc(Relation reln, ...@@ -465,33 +466,24 @@ BufferAlloc(Relation reln,
* in WaitIO until we're done. * in WaitIO until we're done.
*/ */
inProgress = TRUE; inProgress = TRUE;
#ifdef HAS_TEST_AND_SET
/* /*
* All code paths that acquire this lock pin the buffer first; * All code paths that acquire this lock pin the buffer first;
* since no one had it pinned (it just came off the free * since no one had it pinned (it just came off the free
* list), no one else can have this lock. * list), no one else can have this lock.
*/ */
#endif /* HAS_TEST_AND_SET */
StartBufferIO(buf, false); StartBufferIO(buf, false);
/* /*
* Write the buffer out, being careful to release BufMgrLock * Write the buffer out, being careful to release BufMgrLock
* before starting the I/O. * before starting the I/O.
*
* This #ifndef is here because a few extra semops REALLY kill
* you on machines that don't have spinlocks. If you don't
* operate with much concurrency, well...
*/ */
smok = BufferReplace(buf, true); smok = BufferReplace(buf);
#ifndef OPTIMIZE_SINGLE
SpinAcquire(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
if (smok == FALSE) if (smok == FALSE)
{ {
elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s", elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s",
buf->tag.blockNum, buf->sb_dbname, buf->sb_relname); buf->tag.blockNum, buf->blind.dbname, buf->blind.relname);
inProgress = FALSE; inProgress = FALSE;
buf->flags |= BM_IO_ERROR; buf->flags |= BM_IO_ERROR;
buf->flags &= ~BM_IO_IN_PROGRESS; buf->flags &= ~BM_IO_IN_PROGRESS;
...@@ -516,7 +508,7 @@ BufferAlloc(Relation reln, ...@@ -516,7 +508,7 @@ BufferAlloc(Relation reln,
if (buf->flags & BM_JUST_DIRTIED) if (buf->flags & BM_JUST_DIRTIED)
{ {
elog(FATAL, "BufferAlloc: content of block %u (%s) changed while flushing", elog(FATAL, "BufferAlloc: content of block %u (%s) changed while flushing",
buf->tag.blockNum, buf->sb_relname); buf->tag.blockNum, buf->blind.relname);
} }
else else
buf->flags &= ~BM_DIRTY; buf->flags &= ~BM_DIRTY;
...@@ -562,6 +554,7 @@ BufferAlloc(Relation reln, ...@@ -562,6 +554,7 @@ BufferAlloc(Relation reln,
*/ */
if (buf != NULL) if (buf != NULL)
{ {
buf->flags &= ~BM_IO_IN_PROGRESS;
TerminateBufferIO(buf); TerminateBufferIO(buf);
/* give up the buffer since we don't need it any more */ /* give up the buffer since we don't need it any more */
PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0; PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
...@@ -572,7 +565,6 @@ BufferAlloc(Relation reln, ...@@ -572,7 +565,6 @@ BufferAlloc(Relation reln,
AddBufferToFreelist(buf); AddBufferToFreelist(buf);
buf->flags |= BM_FREE; buf->flags |= BM_FREE;
} }
buf->flags &= ~BM_IO_IN_PROGRESS;
} }
PinBuffer(buf2); PinBuffer(buf2);
...@@ -619,8 +611,8 @@ BufferAlloc(Relation reln, ...@@ -619,8 +611,8 @@ BufferAlloc(Relation reln,
} }
/* record the database name and relation name for this buffer */ /* record the database name and relation name for this buffer */
strcpy(buf->sb_relname, RelationGetPhysicalRelationName(reln)); strcpy(buf->blind.dbname, DatabaseName);
strcpy(buf->sb_dbname, DatabaseName); strcpy(buf->blind.relname, RelationGetPhysicalRelationName(reln));
INIT_BUFFERTAG(&(buf->tag), reln, blockNum); INIT_BUFFERTAG(&(buf->tag), reln, blockNum);
if (!BufTableInsert(buf)) if (!BufTableInsert(buf))
...@@ -683,9 +675,9 @@ WriteBuffer(Buffer buffer) ...@@ -683,9 +675,9 @@ WriteBuffer(Buffer buffer)
SpinAcquire(BufMgrLock); SpinAcquire(BufMgrLock);
Assert(bufHdr->refcount > 0); Assert(bufHdr->refcount > 0);
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
SetBufferDirtiedByMe(buffer, bufHdr);
UnpinBuffer(bufHdr); UnpinBuffer(bufHdr);
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
CommitInfoNeedsSave[buffer - 1] = 0;
return TRUE; return TRUE;
} }
...@@ -702,7 +694,7 @@ WriteBuffer_Debug(char *file, int line, Buffer buffer) ...@@ -702,7 +694,7 @@ WriteBuffer_Debug(char *file, int line, Buffer buffer)
buf = &BufferDescriptors[buffer - 1]; buf = &BufferDescriptors[buffer - 1];
fprintf(stderr, "UNPIN(WR) %ld relname = %s, blockNum = %d, \ fprintf(stderr, "UNPIN(WR) %ld relname = %s, blockNum = %d, \
refcount = %ld, file: %s, line: %d\n", refcount = %ld, file: %s, line: %d\n",
buffer, buf->sb_relname, buf->tag.blockNum, buffer, buf->blind.relname, buf->tag.blockNum,
PrivateRefCount[buffer - 1], file, line); PrivateRefCount[buffer - 1], file, line);
} }
} }
...@@ -767,8 +759,9 @@ DirtyBufferCopy(Oid dbid, Oid relid, BlockNumber blkno, char *dest) ...@@ -767,8 +759,9 @@ DirtyBufferCopy(Oid dbid, Oid relid, BlockNumber blkno, char *dest)
* *
* 'buffer' is known to be dirty/pinned, so there should not be a * 'buffer' is known to be dirty/pinned, so there should not be a
* problem reading the BufferDesc members without the BufMgrLock * problem reading the BufferDesc members without the BufMgrLock
* (nobody should be able to change tags, flags, etc. out from under * (nobody should be able to change tags out from under us).
* us). Unpin if 'release' is TRUE. *
* Unpin if 'release' is TRUE.
*/ */
int int
FlushBuffer(Buffer buffer, bool release) FlushBuffer(Buffer buffer, bool release)
...@@ -784,6 +777,8 @@ FlushBuffer(Buffer buffer, bool release) ...@@ -784,6 +777,8 @@ FlushBuffer(Buffer buffer, bool release)
if (BAD_BUFFER_ID(buffer)) if (BAD_BUFFER_ID(buffer))
return STATUS_ERROR; return STATUS_ERROR;
Assert(PrivateRefCount[buffer - 1] > 0); /* else caller didn't pin */
bufHdr = &BufferDescriptors[buffer - 1]; bufHdr = &BufferDescriptors[buffer - 1];
bufdb = bufHdr->tag.relId.dbId; bufdb = bufHdr->tag.relId.dbId;
...@@ -809,7 +804,7 @@ FlushBuffer(Buffer buffer, bool release) ...@@ -809,7 +804,7 @@ FlushBuffer(Buffer buffer, bool release)
if (status == SM_FAIL) if (status == SM_FAIL)
{ {
elog(ERROR, "FlushBuffer: cannot flush block %u of the relation %s", elog(ERROR, "FlushBuffer: cannot flush block %u of the relation %s",
bufHdr->tag.blockNum, bufHdr->sb_relname); bufHdr->tag.blockNum, bufHdr->blind.relname);
return STATUS_ERROR; return STATUS_ERROR;
} }
BufferFlushCount++; BufferFlushCount++;
...@@ -820,19 +815,21 @@ FlushBuffer(Buffer buffer, bool release) ...@@ -820,19 +815,21 @@ FlushBuffer(Buffer buffer, bool release)
/* /*
* If this buffer was marked by someone as DIRTY while we were * If this buffer was marked by someone as DIRTY while we were
* flushing it out we must not clear DIRTY flag - vadim 01/17/97 * flushing it out we must not clear shared DIRTY flag - vadim 01/17/97
*
* ... but we can clear BufferDirtiedByMe anyway - tgl 3/31/00
*/ */
if (bufHdr->flags & BM_JUST_DIRTIED) if (bufHdr->flags & BM_JUST_DIRTIED)
{ {
elog(NOTICE, "FlushBuffer: content of block %u (%s) changed while flushing", elog(NOTICE, "FlushBuffer: content of block %u (%s) changed while flushing",
bufHdr->tag.blockNum, bufHdr->sb_relname); bufHdr->tag.blockNum, bufHdr->blind.relname);
} }
else else
bufHdr->flags &= ~BM_DIRTY; bufHdr->flags &= ~BM_DIRTY;
ClearBufferDirtiedByMe(buffer, bufHdr);
if (release) if (release)
UnpinBuffer(bufHdr); UnpinBuffer(bufHdr);
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
CommitInfoNeedsSave[buffer - 1] = 0;
return STATUS_OK; return STATUS_OK;
} }
...@@ -857,9 +854,10 @@ WriteNoReleaseBuffer(Buffer buffer) ...@@ -857,9 +854,10 @@ WriteNoReleaseBuffer(Buffer buffer)
SharedBufferChanged = true; SharedBufferChanged = true;
SpinAcquire(BufMgrLock); SpinAcquire(BufMgrLock);
Assert(bufHdr->refcount > 0);
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
SetBufferDirtiedByMe(buffer, bufHdr);
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
CommitInfoNeedsSave[buffer - 1] = 0;
return STATUS_OK; return STATUS_OK;
} }
...@@ -901,11 +899,6 @@ ReleaseAndReadBuffer(Buffer buffer, ...@@ -901,11 +899,6 @@ ReleaseAndReadBuffer(Buffer buffer,
AddBufferToFreelist(bufHdr); AddBufferToFreelist(bufHdr);
bufHdr->flags |= BM_FREE; bufHdr->flags |= BM_FREE;
} }
if (CommitInfoNeedsSave[buffer - 1])
{
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
CommitInfoNeedsSave[buffer - 1] = 0;
}
retbuf = ReadBufferWithBufferLock(relation, blockNum, true); retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
return retbuf; return retbuf;
} }
...@@ -915,13 +908,120 @@ ReleaseAndReadBuffer(Buffer buffer, ...@@ -915,13 +908,120 @@ ReleaseAndReadBuffer(Buffer buffer,
return ReadBuffer(relation, blockNum); return ReadBuffer(relation, blockNum);
} }
/*
* SetBufferDirtiedByMe -- mark a shared buffer as being dirtied by this xact
*
* This flag essentially remembers that we need to write and fsync this buffer
* before we can commit the transaction. The write might end up getting done
* by another backend, but we must do the fsync ourselves (else we could
* commit before the data actually reaches disk). We do not issue fsync
* instantly upon write; the storage manager keeps track of which files need
* to be fsync'd before commit can occur. A key aspect of this data structure
* is that we will be able to notify the storage manager that an fsync is
* needed even after another backend has done the physical write and replaced
* the buffer contents with something else!
*
* NB: we must be holding the bufmgr lock at entry, and the buffer must be
* pinned so that no other backend can take it away from us.
*/
static void
SetBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr)
{
BufferTag *tagLastDirtied = & BufferTagLastDirtied[buffer - 1];
Relation reln;
int status;
/*
* If the flag is already set, check to see whether the buffertag is
* the same. If not, some other backend already wrote the buffer data
* that we dirtied. We must tell the storage manager to make an fsync
* pending on that file before we can overwrite the old tag value.
*/
if (BufferDirtiedByMe[buffer - 1])
{
if (bufHdr->tag.relId.dbId == tagLastDirtied->relId.dbId &&
bufHdr->tag.relId.relId == tagLastDirtied->relId.relId &&
bufHdr->tag.blockNum == tagLastDirtied->blockNum)
return; /* Same tag already dirtied, so no work */
#ifndef OPTIMIZE_SINGLE
SpinRelease(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
reln = RelationIdCacheGetRelation(tagLastDirtied->relId.relId);
if (reln == (Relation) NULL)
{
status = smgrblindmarkdirty(DEFAULT_SMGR,
BufferBlindLastDirtied[buffer - 1].dbname,
BufferBlindLastDirtied[buffer - 1].relname,
tagLastDirtied->relId.dbId,
tagLastDirtied->relId.relId,
tagLastDirtied->blockNum);
}
else
{
status = smgrmarkdirty(DEFAULT_SMGR, reln,
tagLastDirtied->blockNum);
/* drop relcache refcnt incremented by RelationIdCacheGetRelation */
RelationDecrementReferenceCount(reln);
}
if (status == SM_FAIL)
{
elog(ERROR, "SetBufferDirtiedByMe: cannot mark %u for %s",
tagLastDirtied->blockNum,
BufferBlindLastDirtied[buffer - 1].relname);
}
#ifndef OPTIMIZE_SINGLE
SpinAcquire(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
}
*tagLastDirtied = bufHdr->tag;
BufferBlindLastDirtied[buffer - 1] = bufHdr->blind;
BufferDirtiedByMe[buffer - 1] = true;
}
/*
* ClearBufferDirtiedByMe -- mark a shared buffer as no longer needing fsync
*
* If we write out a buffer ourselves, then the storage manager will set its
* needs-fsync flag for that file automatically, and so we can clear our own
* flag that says it needs to be done later.
*
* NB: we must be holding the bufmgr lock at entry.
*/
static void
ClearBufferDirtiedByMe(Buffer buffer, BufferDesc *bufHdr)
{
BufferTag *tagLastDirtied = & BufferTagLastDirtied[buffer - 1];
/*
* Do *not* clear the flag if it refers to some other buffertag than
* the data we just wrote. This is unlikely, but possible if some
* other backend replaced the buffer contents since we set our flag.
*/
if (bufHdr->tag.relId.dbId == tagLastDirtied->relId.dbId &&
bufHdr->tag.relId.relId == tagLastDirtied->relId.relId &&
bufHdr->tag.blockNum == tagLastDirtied->blockNum)
{
BufferDirtiedByMe[buffer - 1] = false;
}
}
/* /*
* BufferSync -- Flush all dirty buffers in the pool. * BufferSync -- Flush all dirty buffers in the pool.
* *
* This is called at transaction commit time. It does the wrong thing, * This is called at transaction commit time. We find all buffers
* right now. We should flush only our own changes to stable storage, * that have been dirtied by the current xact and flush them to disk.
* and we should obey the lock protocol on the buffer manager metadata * We do *not* flush dirty buffers that have been dirtied by other xacts.
* as we do it. Also, we need to be sure that no other transaction is * (This is a substantial change from pre-7.0 behavior.)
*
* OLD COMMENTS (do these still apply?)
*
* Also, we need to be sure that no other transaction is
* modifying the page as we flush it. This is only a problem for objects * modifying the page as we flush it. This is only a problem for objects
* that use a non-two-phase locking protocol, like btree indices. For * that use a non-two-phase locking protocol, like btree indices. For
* those objects, we would like to set a write lock for the duration of * those objects, we would like to set a write lock for the duration of
...@@ -936,21 +1036,49 @@ static void ...@@ -936,21 +1036,49 @@ static void
BufferSync() BufferSync()
{ {
int i; int i;
Oid bufdb;
Oid bufrel;
Relation reln;
BufferDesc *bufHdr; BufferDesc *bufHdr;
int status; int status;
Relation reln;
bool didwrite;
SpinAcquire(BufMgrLock);
for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++) for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)
{ {
/* Ignore buffers that were not dirtied by me */
if (! BufferDirtiedByMe[i])
continue;
SpinAcquire(BufMgrLock);
/*
* We only need to write if the buffer is still dirty and still
* contains the same disk page that it contained when we dirtied it.
* Otherwise, someone else has already written our changes for us,
* and we need only fsync.
*
* (NOTE: it's still possible to do an unnecessary write, if other
* xacts have written and then re-dirtied the page since our last
* change to it. But that should be pretty uncommon, and there's
* no easy way to detect it anyway.)
*/
reln = NULL;
didwrite = false;
if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
{ {
Oid bufdb;
Oid bufrel;
bufdb = bufHdr->tag.relId.dbId; bufdb = bufHdr->tag.relId.dbId;
bufrel = bufHdr->tag.relId.relId; bufrel = bufHdr->tag.relId.relId;
if (bufdb == MyDatabaseId || bufdb == (Oid) 0) if (bufdb == BufferTagLastDirtied[i].relId.dbId &&
bufrel == BufferTagLastDirtied[i].relId.relId &&
bufHdr->tag.blockNum == BufferTagLastDirtied[i].blockNum)
{ {
/*
* Try to find relation for buf. This could fail, if the
* rel has been flushed from the relcache since we dirtied
* the page. That should be uncommon, so paying the extra
* cost of a blind write when it happens seems OK.
*/
reln = RelationIdCacheGetRelation(bufrel); reln = RelationIdCacheGetRelation(bufrel);
/* /*
...@@ -970,74 +1098,114 @@ BufferSync() ...@@ -970,74 +1098,114 @@ BufferSync()
if (bufHdr->flags & BM_IO_ERROR) if (bufHdr->flags & BM_IO_ERROR)
{ {
elog(ERROR, "BufferSync: write error %u for %s", elog(ERROR, "BufferSync: write error %u for %s",
bufHdr->tag.blockNum, bufHdr->sb_relname); bufHdr->tag.blockNum, bufHdr->blind.relname);
} }
/* drop refcnt from RelationIdCacheGetRelation */
if (reln != (Relation) NULL)
RelationDecrementReferenceCount(reln);
continue;
}
/*
* To check if block content changed while flushing (see
* below). - vadim 01/17/97
*/
WaitIO(bufHdr, BufMgrLock); /* confirm end of IO */
bufHdr->flags &= ~BM_JUST_DIRTIED;
StartBufferIO(bufHdr, false); /* output IO start */
/*
* If we didn't have the reldesc in our local cache, flush
* this page out using the 'blind write' storage manager
* routine. If we did find it, use the standard
* interface.
*/
#ifndef OPTIMIZE_SINGLE
SpinRelease(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
if (reln == (Relation) NULL)
{
status = smgrblindwrt(DEFAULT_SMGR, bufHdr->sb_dbname,
bufHdr->sb_relname, bufdb, bufrel,
bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data));
} }
else else
{ {
status = smgrwrite(DEFAULT_SMGR, reln, /*
bufHdr->tag.blockNum, * To check if block content changed while flushing (see
(char *) MAKE_PTR(bufHdr->data)); * below). - vadim 01/17/97
} */
WaitIO(bufHdr, BufMgrLock); /* confirm end of IO */
bufHdr->flags &= ~BM_JUST_DIRTIED;
StartBufferIO(bufHdr, false); /* output IO start */
/*
* If we didn't have the reldesc in our local cache, write
* this page out using the 'blind write' storage manager
* routine. If we did find it, use the standard
* interface.
*/
#ifndef OPTIMIZE_SINGLE #ifndef OPTIMIZE_SINGLE
SpinAcquire(BufMgrLock); SpinRelease(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
if (reln == (Relation) NULL)
{
status = smgrblindwrt(DEFAULT_SMGR,
bufHdr->blind.dbname,
bufHdr->blind.relname,
bufdb, bufrel,
bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data));
}
else
{
status = smgrwrite(DEFAULT_SMGR, reln,
bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data));
}
#ifndef OPTIMIZE_SINGLE
SpinAcquire(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */ #endif /* OPTIMIZE_SINGLE */
UnpinBuffer(bufHdr); UnpinBuffer(bufHdr);
if (status == SM_FAIL) if (status == SM_FAIL)
{ {
bufHdr->flags |= BM_IO_ERROR; bufHdr->flags |= BM_IO_ERROR;
elog(ERROR, "BufferSync: cannot write %u for %s", elog(ERROR, "BufferSync: cannot write %u for %s",
bufHdr->tag.blockNum, bufHdr->sb_relname); bufHdr->tag.blockNum, bufHdr->blind.relname);
}
bufHdr->flags &= ~BM_IO_IN_PROGRESS; /* mark IO finished */
TerminateBufferIO(bufHdr); /* Sync IO finished */
BufferFlushCount++;
didwrite = true;
/*
* If this buffer was marked by someone as DIRTY while we
* were flushing it out we must not clear DIRTY flag -
* vadim 01/17/97
*
* but it is OK to clear BufferDirtiedByMe - tgl 3/31/00
*/
if (!(bufHdr->flags & BM_JUST_DIRTIED))
bufHdr->flags &= ~BM_DIRTY;
} }
bufHdr->flags &= ~BM_IO_IN_PROGRESS; /* mark IO finished */
TerminateBufferIO(bufHdr); /* Sync IO finished */
BufferFlushCount++;
/* /* drop refcnt obtained by RelationIdCacheGetRelation */
* If this buffer was marked by someone as DIRTY while we
* were flushing it out we must not clear DIRTY flag -
* vadim 01/17/97
*/
if (!(bufHdr->flags & BM_JUST_DIRTIED))
bufHdr->flags &= ~BM_DIRTY;
/* drop refcnt from RelationIdCacheGetRelation */
if (reln != (Relation) NULL) if (reln != (Relation) NULL)
RelationDecrementReferenceCount(reln); RelationDecrementReferenceCount(reln);
} }
} }
/*
* If we did not write the buffer (because someone else did),
* we must still fsync the file containing it, to ensure that the
* write is down to disk before we commit.
*/
if (! didwrite)
{
#ifndef OPTIMIZE_SINGLE
SpinRelease(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
reln = RelationIdCacheGetRelation(BufferTagLastDirtied[i].relId.relId);
if (reln == (Relation) NULL)
{
status = smgrblindmarkdirty(DEFAULT_SMGR,
BufferBlindLastDirtied[i].dbname,
BufferBlindLastDirtied[i].relname,
BufferTagLastDirtied[i].relId.dbId,
BufferTagLastDirtied[i].relId.relId,
BufferTagLastDirtied[i].blockNum);
}
else
{
status = smgrmarkdirty(DEFAULT_SMGR, reln,
BufferTagLastDirtied[i].blockNum);
/* drop relcache refcnt incremented by RelationIdCacheGetRelation */
RelationDecrementReferenceCount(reln);
}
#ifndef OPTIMIZE_SINGLE
SpinAcquire(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
}
BufferDirtiedByMe[i] = false;
SpinRelease(BufMgrLock);
} }
SpinRelease(BufMgrLock);
LocalBufferSync(); LocalBufferSync();
} }
...@@ -1166,13 +1334,19 @@ ResetBufferUsage() ...@@ -1166,13 +1334,19 @@ ResetBufferUsage()
/* ---------------------------------------------- /* ----------------------------------------------
* ResetBufferPool * ResetBufferPool
* *
* this routine is supposed to be called when a transaction aborts. * This routine is supposed to be called when a transaction aborts.
* it will release all the buffer pins held by the transaction. * it will release all the buffer pins held by the transaction.
* Currently, we also call it during commit if BufferPoolCheckLeak
* detected a problem --- in that case, isCommit is TRUE, and we
* only clean up buffer pin counts.
*
* During abort, we also forget any pending fsync requests. Dirtied buffers
* will still get written, eventually, but there will be no fsync for them.
* *
* ---------------------------------------------- * ----------------------------------------------
*/ */
void void
ResetBufferPool() ResetBufferPool(bool isCommit)
{ {
int i; int i;
...@@ -1193,10 +1367,15 @@ ResetBufferPool() ...@@ -1193,10 +1367,15 @@ ResetBufferPool()
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
} }
PrivateRefCount[i] = 0; PrivateRefCount[i] = 0;
CommitInfoNeedsSave[i] = 0;
if (! isCommit)
BufferDirtiedByMe[i] = false;
} }
ResetLocalBufferPool(); ResetLocalBufferPool();
if (! isCommit)
smgrabort();
} }
/* ----------------------------------------------- /* -----------------------------------------------
...@@ -1222,7 +1401,7 @@ BufferPoolCheckLeak() ...@@ -1222,7 +1401,7 @@ BufferPoolCheckLeak()
"Buffer Leak: [%03d] (freeNext=%ld, freePrev=%ld, \ "Buffer Leak: [%03d] (freeNext=%ld, freePrev=%ld, \
relname=%s, blockNum=%d, flags=0x%x, refcount=%d %ld)", relname=%s, blockNum=%d, flags=0x%x, refcount=%d %ld)",
i - 1, buf->freeNext, buf->freePrev, i - 1, buf->freeNext, buf->freePrev,
buf->sb_relname, buf->tag.blockNum, buf->flags, buf->blind.relname, buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i - 1]); buf->refcount, PrivateRefCount[i - 1]);
result = 1; result = 1;
} }
...@@ -1306,25 +1485,25 @@ BufferGetRelation(Buffer buffer) ...@@ -1306,25 +1485,25 @@ BufferGetRelation(Buffer buffer)
/* /*
* BufferReplace * BufferReplace
* *
* Flush the buffer corresponding to 'bufHdr' * Write out the buffer corresponding to 'bufHdr'
* *
* This routine used to flush the data to disk (ie, force immediate fsync)
* but that's no longer necessary because BufferSync is smarter than before.
*
* BufMgrLock must be held at entry, and the buffer must be pinned.
*/ */
static int static int
BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld) BufferReplace(BufferDesc *bufHdr)
{ {
Relation reln; Relation reln;
Oid bufdb, Oid bufdb,
bufrel; bufrel;
int status; int status;
if (!bufferLockHeld)
SpinAcquire(BufMgrLock);
/* /*
* first try to find the reldesc in the cache, if no luck, don't * first try to find the reldesc in the cache, if no luck, don't
* bother to build the reldesc from scratch, just do a blind write. * bother to build the reldesc from scratch, just do a blind write.
*/ */
bufdb = bufHdr->tag.relId.dbId; bufdb = bufHdr->tag.relId.dbId;
bufrel = bufHdr->tag.relId.relId; bufrel = bufHdr->tag.relId.relId;
...@@ -1336,22 +1515,27 @@ BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld) ...@@ -1336,22 +1515,27 @@ BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld)
/* To check if block content changed while flushing. - vadim 01/17/97 */ /* To check if block content changed while flushing. - vadim 01/17/97 */
bufHdr->flags &= ~BM_JUST_DIRTIED; bufHdr->flags &= ~BM_JUST_DIRTIED;
#ifndef OPTIMIZE_SINGLE
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
if (reln != (Relation) NULL) if (reln != (Relation) NULL)
{ {
status = smgrflush(DEFAULT_SMGR, reln, bufHdr->tag.blockNum, status = smgrwrite(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data)); (char *) MAKE_PTR(bufHdr->data));
} }
else else
{ {
/* blind write always flushes */ status = smgrblindwrt(DEFAULT_SMGR, bufHdr->blind.dbname,
status = smgrblindwrt(DEFAULT_SMGR, bufHdr->sb_dbname, bufHdr->blind.relname, bufdb, bufrel,
bufHdr->sb_relname, bufdb, bufrel,
bufHdr->tag.blockNum, bufHdr->tag.blockNum,
(char *) MAKE_PTR(bufHdr->data)); (char *) MAKE_PTR(bufHdr->data));
} }
#ifndef OPTIMIZE_SINGLE
SpinAcquire(BufMgrLock);
#endif /* OPTIMIZE_SINGLE */
/* drop relcache refcnt incremented by RelationIdCacheGetRelation */ /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
if (reln != (Relation) NULL) if (reln != (Relation) NULL)
RelationDecrementReferenceCount(reln); RelationDecrementReferenceCount(reln);
...@@ -1359,6 +1543,11 @@ BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld) ...@@ -1359,6 +1543,11 @@ BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld)
if (status == SM_FAIL) if (status == SM_FAIL)
return FALSE; return FALSE;
/* If we had marked this buffer as needing to be fsync'd, we can forget
* about that, because it's now the storage manager's responsibility.
*/
ClearBufferDirtiedByMe(BufferDescriptorGetBuffer(bufHdr), bufHdr);
BufferFlushCount++; BufferFlushCount++;
return TRUE; return TRUE;
...@@ -1440,7 +1629,7 @@ ReleaseRelationBuffers(Relation rel) ...@@ -1440,7 +1629,7 @@ ReleaseRelationBuffers(Relation rel)
} }
/* Now we can do what we came for */ /* Now we can do what we came for */
buf->flags &= ~ ( BM_DIRTY | BM_JUST_DIRTIED); buf->flags &= ~ ( BM_DIRTY | BM_JUST_DIRTIED);
CommitInfoNeedsSave[i - 1] = 0; ClearBufferDirtiedByMe(i, buf);
/* /*
* Release any refcount we may have. * Release any refcount we may have.
* *
...@@ -1502,6 +1691,7 @@ DropBuffers(Oid dbid) ...@@ -1502,6 +1691,7 @@ DropBuffers(Oid dbid)
} }
/* Now we can do what we came for */ /* Now we can do what we came for */
buf->flags &= ~ ( BM_DIRTY | BM_JUST_DIRTIED); buf->flags &= ~ ( BM_DIRTY | BM_JUST_DIRTIED);
ClearBufferDirtiedByMe(i, buf);
/* /*
* The thing should be free, if caller has checked that * The thing should be free, if caller has checked that
* no backends are running in that database. * no backends are running in that database.
...@@ -1533,7 +1723,7 @@ PrintBufferDescs() ...@@ -1533,7 +1723,7 @@ PrintBufferDescs()
elog(DEBUG, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \ elog(DEBUG, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \
blockNum=%d, flags=0x%x, refcount=%d %ld)", blockNum=%d, flags=0x%x, refcount=%d %ld)",
i, buf->freeNext, buf->freePrev, i, buf->freeNext, buf->freePrev,
buf->sb_relname, buf->tag.blockNum, buf->flags, buf->blind.relname, buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]); buf->refcount, PrivateRefCount[i]);
} }
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
...@@ -1544,7 +1734,7 @@ blockNum=%d, flags=0x%x, refcount=%d %ld)", ...@@ -1544,7 +1734,7 @@ blockNum=%d, flags=0x%x, refcount=%d %ld)",
for (i = 0; i < NBuffers; ++i, ++buf) for (i = 0; i < NBuffers; ++i, ++buf)
{ {
printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n", printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n",
i, buf->sb_relname, buf->tag.blockNum, i, buf->blind.relname, buf->tag.blockNum,
buf->flags, buf->refcount, PrivateRefCount[i]); buf->flags, buf->refcount, PrivateRefCount[i]);
} }
} }
...@@ -1562,7 +1752,7 @@ PrintPinnedBufs() ...@@ -1562,7 +1752,7 @@ PrintPinnedBufs()
if (PrivateRefCount[i] > 0) if (PrivateRefCount[i] > 0)
elog(NOTICE, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \ elog(NOTICE, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \
blockNum=%d, flags=0x%x, refcount=%d %ld)\n", blockNum=%d, flags=0x%x, refcount=%d %ld)\n",
i, buf->freeNext, buf->freePrev, buf->sb_relname, i, buf->freeNext, buf->freePrev, buf->blind.relname,
buf->tag.blockNum, buf->flags, buf->tag.blockNum, buf->flags,
buf->refcount, PrivateRefCount[i]); buf->refcount, PrivateRefCount[i]);
} }
...@@ -1601,33 +1791,42 @@ BufferPoolBlowaway() ...@@ -1601,33 +1791,42 @@ BufferPoolBlowaway()
* FlushRelationBuffers * FlushRelationBuffers
* *
* This function removes from the buffer pool all pages of a relation * This function removes from the buffer pool all pages of a relation
* that have blocknumber >= specified block. If doFlush is true, * that have blocknumber >= specified block. Pages that are dirty are
* dirty buffers are written out --- otherwise it's an error for any * written out first. If expectDirty is false, a notice is emitted
* of the buffers to be dirty. * warning of dirty buffers, but we proceed anyway. An error code is
* returned if we fail to dump a dirty buffer or if we find one of
* the target pages is pinned into the cache.
* *
* This is used by VACUUM before truncating the relation to the given * This is used by VACUUM before truncating the relation to the given
* number of blocks. For VACUUM, we pass doFlush = false since it would * number of blocks. For VACUUM, we pass expectDirty = false since it
* mean a bug in VACUUM if any of the unwanted pages were still dirty. * could mean a bug in VACUUM if any of the unwanted pages were still
* (TRUNCATE TABLE also uses it in the same way.) * dirty. (TRUNCATE TABLE also uses it in the same way.)
* *
* This is also used by RENAME TABLE (with block = 0 and doFlush = true) * This is also used by RENAME TABLE (with block=0 and expectDirty=true)
* to clear out the buffer cache before renaming the physical files of * to clear out the buffer cache before renaming the physical files of
* a relation. Without that, some other backend might try to do a * a relation. Without that, some other backend might try to do a
* blind write of a buffer page (relying on the sb_relname of the buffer) * blind write of a buffer page (relying on the BlindId of the buffer)
* and fail because it's not got the right filename anymore. * and fail because it's not got the right filename anymore.
* *
* In both cases, the caller should be holding AccessExclusiveLock on * In both cases, the caller should be holding AccessExclusiveLock on
* the target relation to ensure that no other backend is busy reading * the target relation to ensure that no other backend is busy reading
* more blocks of the relation... * more blocks of the relation.
*
* Formerly, we considered it an error condition if we found unexpectedly
* dirty buffers. However, since BufferSync no longer forces out all
* dirty buffers at every xact commit, it's possible for dirty buffers
* to still be present in the cache due to failure of an earlier
* transaction. So, downgrade the error to a mere notice. Maybe we
* shouldn't even emit a notice...
* *
* Returns: 0 - Ok, -1 - DIRTY, -2 - PINNED * Returns: 0 - Ok, -1 - FAILED TO WRITE DIRTY BUFFER, -2 - PINNED
* *
* XXX currently it sequentially searches the buffer pool, should be * XXX currently it sequentially searches the buffer pool, should be
* changed to more clever ways of searching. * changed to more clever ways of searching.
* -------------------------------------------------------------------- * --------------------------------------------------------------------
*/ */
int int
FlushRelationBuffers(Relation rel, BlockNumber block, bool doFlush) FlushRelationBuffers(Relation rel, BlockNumber block, bool expectDirty)
{ {
int i; int i;
BufferDesc *buf; BufferDesc *buf;
...@@ -1642,21 +1841,15 @@ FlushRelationBuffers(Relation rel, BlockNumber block, bool doFlush) ...@@ -1642,21 +1841,15 @@ FlushRelationBuffers(Relation rel, BlockNumber block, bool doFlush)
{ {
if (buf->flags & BM_DIRTY) if (buf->flags & BM_DIRTY)
{ {
if (doFlush) if (! expectDirty)
{
if (FlushBuffer(-i-1, false) != STATUS_OK)
{
elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it",
RelationGetRelationName(rel),
block, buf->tag.blockNum);
return -1;
}
}
else
{
elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty", elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty",
RelationGetRelationName(rel), RelationGetRelationName(rel),
block, buf->tag.blockNum); block, buf->tag.blockNum);
if (FlushBuffer(-i-1, false) != STATUS_OK)
{
elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it",
RelationGetRelationName(rel),
block, buf->tag.blockNum);
return -1; return -1;
} }
} }
...@@ -1676,39 +1869,42 @@ FlushRelationBuffers(Relation rel, BlockNumber block, bool doFlush) ...@@ -1676,39 +1869,42 @@ FlushRelationBuffers(Relation rel, BlockNumber block, bool doFlush)
SpinAcquire(BufMgrLock); SpinAcquire(BufMgrLock);
for (i = 0; i < NBuffers; i++) for (i = 0; i < NBuffers; i++)
{ {
recheck:
buf = &BufferDescriptors[i]; buf = &BufferDescriptors[i];
if (buf->tag.relId.dbId == MyDatabaseId && if (buf->tag.relId.relId == RelationGetRelid(rel) &&
buf->tag.relId.relId == RelationGetRelid(rel) && (buf->tag.relId.dbId == MyDatabaseId ||
buf->tag.relId.dbId == (Oid) NULL) &&
buf->tag.blockNum >= block) buf->tag.blockNum >= block)
{ {
if (buf->flags & BM_DIRTY) if (buf->flags & BM_DIRTY)
{ {
if (doFlush) PinBuffer(buf);
{ SpinRelease(BufMgrLock);
SpinRelease(BufMgrLock); if (! expectDirty)
if (FlushBuffer(i+1, false) != STATUS_OK)
{
elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %ld, global %d), could not flush it",
buf->sb_relname, block, buf->tag.blockNum,
PrivateRefCount[i], buf->refcount);
return -1;
}
SpinAcquire(BufMgrLock);
}
else
{
SpinRelease(BufMgrLock);
elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %ld, global %d)", elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %ld, global %d)",
buf->sb_relname, block, buf->tag.blockNum, RelationGetRelationName(rel), block,
buf->tag.blockNum,
PrivateRefCount[i], buf->refcount);
if (FlushBuffer(i+1, true) != STATUS_OK)
{
elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is dirty (private %ld, global %d), could not flush it",
RelationGetRelationName(rel), block,
buf->tag.blockNum,
PrivateRefCount[i], buf->refcount); PrivateRefCount[i], buf->refcount);
return -1; return -1;
} }
SpinAcquire(BufMgrLock);
/* Buffer could already be reassigned, so must recheck
* whether it still belongs to rel before freeing it!
*/
goto recheck;
} }
if (!(buf->flags & BM_FREE)) if (!(buf->flags & BM_FREE))
{ {
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is referenced (private %ld, global %d)", elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is referenced (private %ld, global %d)",
buf->sb_relname, block, buf->tag.blockNum, RelationGetRelationName(rel), block,
buf->tag.blockNum,
PrivateRefCount[i], buf->refcount); PrivateRefCount[i], buf->refcount);
return -2; return -2;
} }
...@@ -1755,11 +1951,6 @@ ReleaseBuffer(Buffer buffer) ...@@ -1755,11 +1951,6 @@ ReleaseBuffer(Buffer buffer)
AddBufferToFreelist(bufHdr); AddBufferToFreelist(bufHdr);
bufHdr->flags |= BM_FREE; bufHdr->flags |= BM_FREE;
} }
if (CommitInfoNeedsSave[buffer - 1])
{
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
CommitInfoNeedsSave[buffer - 1] = 0;
}
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
} }
...@@ -1777,7 +1968,7 @@ IncrBufferRefCount_Debug(char *file, int line, Buffer buffer) ...@@ -1777,7 +1968,7 @@ IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
fprintf(stderr, "PIN(Incr) %ld relname = %s, blockNum = %d, \ fprintf(stderr, "PIN(Incr) %ld relname = %s, blockNum = %d, \
refcount = %ld, file: %s, line: %d\n", refcount = %ld, file: %s, line: %d\n",
buffer, buf->sb_relname, buf->tag.blockNum, buffer, buf->blind.relname, buf->tag.blockNum,
PrivateRefCount[buffer - 1], file, line); PrivateRefCount[buffer - 1], file, line);
} }
} }
...@@ -1795,7 +1986,7 @@ ReleaseBuffer_Debug(char *file, int line, Buffer buffer) ...@@ -1795,7 +1986,7 @@ ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
fprintf(stderr, "UNPIN(Rel) %ld relname = %s, blockNum = %d, \ fprintf(stderr, "UNPIN(Rel) %ld relname = %s, blockNum = %d, \
refcount = %ld, file: %s, line: %d\n", refcount = %ld, file: %s, line: %d\n",
buffer, buf->sb_relname, buf->tag.blockNum, buffer, buf->blind.relname, buf->tag.blockNum,
PrivateRefCount[buffer - 1], file, line); PrivateRefCount[buffer - 1], file, line);
} }
} }
...@@ -1822,7 +2013,7 @@ ReleaseAndReadBuffer_Debug(char *file, ...@@ -1822,7 +2013,7 @@ ReleaseAndReadBuffer_Debug(char *file,
fprintf(stderr, "UNPIN(Rel&Rd) %ld relname = %s, blockNum = %d, \ fprintf(stderr, "UNPIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
refcount = %ld, file: %s, line: %d\n", refcount = %ld, file: %s, line: %d\n",
buffer, buf->sb_relname, buf->tag.blockNum, buffer, buf->blind.relname, buf->tag.blockNum,
PrivateRefCount[buffer - 1], file, line); PrivateRefCount[buffer - 1], file, line);
} }
if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer)) if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
...@@ -1831,7 +2022,7 @@ refcount = %ld, file: %s, line: %d\n", ...@@ -1831,7 +2022,7 @@ refcount = %ld, file: %s, line: %d\n",
fprintf(stderr, "PIN(Rel&Rd) %ld relname = %s, blockNum = %d, \ fprintf(stderr, "PIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
refcount = %ld, file: %s, line: %d\n", refcount = %ld, file: %s, line: %d\n",
b, buf->sb_relname, buf->tag.blockNum, b, buf->blind.relname, buf->tag.blockNum,
PrivateRefCount[b - 1], file, line); PrivateRefCount[b - 1], file, line);
} }
return b; return b;
...@@ -1983,11 +2174,43 @@ _bm_die(Oid dbId, Oid relId, int blkNo, int bufNo, ...@@ -1983,11 +2174,43 @@ _bm_die(Oid dbId, Oid relId, int blkNo, int bufNo,
#endif /* BMTRACE */ #endif /* BMTRACE */
/*
* SetBufferCommitInfoNeedsSave
*
* Mark a buffer dirty when we have updated tuple commit-status bits in it.
*
* This is similar to WriteNoReleaseBuffer, except that we do not set
* SharedBufferChanged or BufferDirtiedByMe, because we have not made a
* critical change that has to be flushed to disk before xact commit --- the
* status-bit update could be redone by someone else just as easily. The
* buffer will be marked dirty, but it will not be written to disk until
* there is another reason to write it.
*
* This routine might get called many times on the same page, if we are making
* the first scan after commit of an xact that added/deleted many tuples.
* So, be as quick as we can if the buffer is already dirty.
*/
void void
SetBufferCommitInfoNeedsSave(Buffer buffer) SetBufferCommitInfoNeedsSave(Buffer buffer)
{ {
if (!BufferIsLocal(buffer)) BufferDesc *bufHdr;
CommitInfoNeedsSave[buffer - 1]++;
if (BufferIsLocal(buffer))
return;
if (BAD_BUFFER_ID(buffer))
return;
bufHdr = &BufferDescriptors[buffer - 1];
if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
(BM_DIRTY | BM_JUST_DIRTIED))
{
SpinAcquire(BufMgrLock);
Assert(bufHdr->refcount > 0);
bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
SpinRelease(BufMgrLock);
}
} }
void void
...@@ -2175,7 +2398,16 @@ static void StartBufferIO(BufferDesc *buf, bool forInput) ...@@ -2175,7 +2398,16 @@ static void StartBufferIO(BufferDesc *buf, bool forInput)
Assert(!(buf->flags & BM_IO_IN_PROGRESS)); Assert(!(buf->flags & BM_IO_IN_PROGRESS));
buf->flags |= BM_IO_IN_PROGRESS; buf->flags |= BM_IO_IN_PROGRESS;
#ifdef HAS_TEST_AND_SET #ifdef HAS_TEST_AND_SET
Assert(S_LOCK_FREE(&(buf->io_in_progress_lock))) /*
* There used to be
*
* Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
*
* here, but that's wrong because of the way WaitIO works: someone else
* waiting for the I/O to complete will succeed in grabbing the lock for
* a few instructions, and if we context-swap back to here the Assert
* could fail. Tiny window for failure, but I've seen it happen -- tgl
*/
S_LOCK(&(buf->io_in_progress_lock)); S_LOCK(&(buf->io_in_progress_lock));
#endif /* HAS_TEST_AND_SET */ #endif /* HAS_TEST_AND_SET */
InProgressBuf = buf; InProgressBuf = buf;
...@@ -2217,7 +2449,7 @@ static void ContinueBufferIO(BufferDesc *buf, bool forInput) ...@@ -2217,7 +2449,7 @@ static void ContinueBufferIO(BufferDesc *buf, bool forInput)
IsForInput = forInput; IsForInput = forInput;
} }
extern void InitBufferIO(void) void InitBufferIO(void)
{ {
InProgressBuf = (BufferDesc *)0; InProgressBuf = (BufferDesc *)0;
} }
...@@ -2229,7 +2461,7 @@ extern void InitBufferIO(void) ...@@ -2229,7 +2461,7 @@ extern void InitBufferIO(void)
* set in case of output,this routine would kill all * set in case of output,this routine would kill all
* backends and reset postmaster. * backends and reset postmaster.
*/ */
extern void AbortBufferIO(void) void AbortBufferIO(void)
{ {
BufferDesc *buf = InProgressBuf; BufferDesc *buf = InProgressBuf;
if (buf) if (buf)
...@@ -2252,8 +2484,8 @@ extern void AbortBufferIO(void) ...@@ -2252,8 +2484,8 @@ extern void AbortBufferIO(void)
buf->flags |= BM_DIRTY; buf->flags |= BM_DIRTY;
} }
buf->flags |= BM_IO_ERROR; buf->flags |= BM_IO_ERROR;
TerminateBufferIO(buf);
buf->flags &= ~BM_IO_IN_PROGRESS; buf->flags &= ~BM_IO_IN_PROGRESS;
TerminateBufferIO(buf);
SpinRelease(BufMgrLock); SpinRelease(BufMgrLock);
} }
} }
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/buffer/freelist.c,v 1.20 2000/01/26 05:56:52 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/freelist.c,v 1.21 2000/04/09 04:43:19 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -122,7 +122,7 @@ PinBuffer_Debug(char *file, int line, BufferDesc *buf) ...@@ -122,7 +122,7 @@ PinBuffer_Debug(char *file, int line, BufferDesc *buf)
fprintf(stderr, "PIN(Pin) %ld relname = %s, blockNum = %d, \ fprintf(stderr, "PIN(Pin) %ld relname = %s, blockNum = %d, \
refcount = %ld, file: %s, line: %d\n", refcount = %ld, file: %s, line: %d\n",
buffer, buf->sb_relname, buf->tag.blockNum, buffer, buf->blind.relname, buf->tag.blockNum,
PrivateRefCount[buffer - 1], file, line); PrivateRefCount[buffer - 1], file, line);
} }
} }
...@@ -168,7 +168,7 @@ UnpinBuffer_Debug(char *file, int line, BufferDesc *buf) ...@@ -168,7 +168,7 @@ UnpinBuffer_Debug(char *file, int line, BufferDesc *buf)
fprintf(stderr, "UNPIN(Unpin) %ld relname = %s, blockNum = %d, \ fprintf(stderr, "UNPIN(Unpin) %ld relname = %s, blockNum = %d, \
refcount = %ld, file: %s, line: %d\n", refcount = %ld, file: %s, line: %d\n",
buffer, buf->sb_relname, buf->tag.blockNum, buffer, buf->blind.relname, buf->tag.blockNum,
PrivateRefCount[buffer - 1], file, line); PrivateRefCount[buffer - 1], file, line);
} }
} }
...@@ -304,7 +304,7 @@ PrintBufferFreeList() ...@@ -304,7 +304,7 @@ PrintBufferFreeList()
int i = (buf - BufferDescriptors); int i = (buf - BufferDescriptors);
printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld, nxt=%ld prv=%ld)\n", printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld, nxt=%ld prv=%ld)\n",
i, buf->sb_relname, buf->tag.blockNum, i, buf->blind.relname, buf->tag.blockNum,
buf->flags, buf->refcount, PrivateRefCount[i], buf->flags, buf->refcount, PrivateRefCount[i],
buf->freeNext, buf->freePrev); buf->freeNext, buf->freePrev);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/file/fd.c,v 1.54 2000/03/17 02:36:19 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/file/fd.c,v 1.55 2000/04/09 04:43:19 tgl Exp $
* *
* NOTES: * NOTES:
* *
...@@ -293,7 +293,7 @@ LruDelete(File file) ...@@ -293,7 +293,7 @@ LruDelete(File file)
vfdP->seekPos = (long) lseek(vfdP->fd, 0L, SEEK_CUR); vfdP->seekPos = (long) lseek(vfdP->fd, 0L, SEEK_CUR);
Assert(vfdP->seekPos != -1); Assert(vfdP->seekPos != -1);
/* if we have written to the file, sync it */ /* if we have written to the file, sync it before closing */
if (vfdP->fdstate & FD_DIRTY) if (vfdP->fdstate & FD_DIRTY)
{ {
returnValue = pg_fsync(vfdP->fd); returnValue = pg_fsync(vfdP->fd);
...@@ -381,9 +381,6 @@ tryAgain: ...@@ -381,9 +381,6 @@ tryAgain:
returnValue = lseek(vfdP->fd, vfdP->seekPos, SEEK_SET); returnValue = lseek(vfdP->fd, vfdP->seekPos, SEEK_SET);
Assert(returnValue != -1); Assert(returnValue != -1);
} }
/* Update state as appropriate for re-open (needed?) */
vfdP->fdstate &= ~FD_DIRTY;
} }
/* /*
...@@ -804,7 +801,7 @@ FileWrite(File file, char *buffer, int amount) ...@@ -804,7 +801,7 @@ FileWrite(File file, char *buffer, int amount)
if (returnCode > 0) if (returnCode > 0)
VfdCache[file].seekPos += returnCode; VfdCache[file].seekPos += returnCode;
/* record the write */ /* mark the file as needing fsync */
VfdCache[file].fdstate |= FD_DIRTY; VfdCache[file].fdstate |= FD_DIRTY;
return returnCode; return returnCode;
...@@ -873,6 +870,35 @@ FileTruncate(File file, long offset) ...@@ -873,6 +870,35 @@ FileTruncate(File file, long offset)
return returnCode; return returnCode;
} }
/*
* FileSync --- if a file is marked as dirty, fsync it.
*
* The FD_DIRTY bit is slightly misnamed: it doesn't mean that we need to
* write the file, but that we *have* written it and need to execute an
* fsync() to ensure the changes are down on disk before we mark the current
* transaction committed.
*
* FD_DIRTY is set by FileWrite or by an explicit FileMarkDirty() call.
* It is cleared after successfully fsync'ing the file. FileClose() will
* fsync a dirty File that is about to be closed, since there will be no
* other place to remember the need to fsync after the VFD is gone.
*
* Note that the DIRTY bit is logically associated with the actual disk file,
* not with any particular kernel FD we might have open for it. We assume
* that fsync will force out any dirty buffers for that file, whether or not
* they were written through the FD being used for the fsync call --- they
* might even have been written by some other backend!
*
* Note also that LruDelete currently fsyncs a dirty file that it is about
* to close the kernel file descriptor for. The idea there is to avoid
* having to re-open the kernel descriptor later. But it's not real clear
* that this is a performance win; we could end up fsyncing the same file
* multiple times in a transaction, which would probably cost more time
* than is saved by avoiding an open() call. This should be studied.
*
* This routine used to think it could skip the fsync if the file is
* physically closed, but that is now WRONG; see comments for FileMarkDirty.
*/
int int
FileSync(File file) FileSync(File file)
{ {
...@@ -880,23 +906,66 @@ FileSync(File file) ...@@ -880,23 +906,66 @@ FileSync(File file)
Assert(FileIsValid(file)); Assert(FileIsValid(file));
/* if (!(VfdCache[file].fdstate & FD_DIRTY))
* If the file isn't open, then we don't need to sync it; we always {
* sync files when we close them. Also, if we haven't done any writes /* Need not sync if file is not dirty. */
* that we haven't already synced, we can ignore the request.
*/
if (VfdCache[file].fd < 0 || !(VfdCache[file].fdstate & FD_DIRTY))
returnCode = 0; returnCode = 0;
else }
else if (disableFsync)
{ {
returnCode = pg_fsync(VfdCache[file].fd); /* Don't force the file open if pg_fsync isn't gonna sync it. */
returnCode = 0;
VfdCache[file].fdstate &= ~FD_DIRTY; VfdCache[file].fdstate &= ~FD_DIRTY;
} }
else
{
/* We don't use FileAccess() because we don't want to force the
* file to the front of the LRU ring; we aren't expecting to
* access it again soon.
*/
if (FileIsNotOpen(file))
{
returnCode = LruInsert(file);
if (returnCode != 0)
return returnCode;
}
returnCode = pg_fsync(VfdCache[file].fd);
if (returnCode == 0)
VfdCache[file].fdstate &= ~FD_DIRTY;
}
return returnCode; return returnCode;
} }
/*
* FileMarkDirty --- mark a file as needing fsync at transaction commit.
*
* Since FileWrite marks the file dirty, this routine is not needed in
* normal use. It is called when the buffer manager detects that some other
* backend has written out a shared buffer that this backend dirtied (but
* didn't write) in the current xact. In that scenario, we need to fsync
* the file before we can commit. We cannot assume that the other backend
* has fsync'd the file yet; we need to do our own fsync to ensure that
* (a) the disk page is written and (b) this backend's commit is delayed
* until the write is complete.
*
* Note we are assuming that an fsync issued by this backend will write
* kernel disk buffers that were dirtied by another backend. Furthermore,
* it doesn't matter whether we currently have the file physically open;
* we must fsync even if we have to re-open the file to do it.
*/
void
FileMarkDirty(File file)
{
Assert(FileIsValid(file));
DO_DB(elog(DEBUG, "FileMarkDirty: %d (%s)",
file, VfdCache[file].fileName));
VfdCache[file].fdstate |= FD_DIRTY;
}
/* /*
* Routines that want to use stdio (ie, FILE*) should use AllocateFile * Routines that want to use stdio (ie, FILE*) should use AllocateFile
* rather than plain fopen(). This lets fd.c deal with freeing FDs if * rather than plain fopen(). This lets fd.c deal with freeing FDs if
...@@ -992,6 +1061,12 @@ closeAllVfds() ...@@ -992,6 +1061,12 @@ closeAllVfds()
* exit (it doesn't particularly care which). All still-open temporary-file * exit (it doesn't particularly care which). All still-open temporary-file
* VFDs are closed, which also causes the underlying files to be deleted. * VFDs are closed, which also causes the underlying files to be deleted.
* Furthermore, all "allocated" stdio files are closed. * Furthermore, all "allocated" stdio files are closed.
*
* This routine is not involved in fsync'ing non-temporary files at xact
* commit; that is done by FileSync under control of the buffer manager.
* During a commit, that is done *before* control gets here. If we still
* have any needs-fsync bits set when we get here, we assume this is abort
* and clear them.
*/ */
void void
AtEOXact_Files(void) AtEOXact_Files(void)
...@@ -1006,6 +1081,8 @@ AtEOXact_Files(void) ...@@ -1006,6 +1081,8 @@ AtEOXact_Files(void)
if ((VfdCache[i].fdstate & FD_TEMPORARY) && if ((VfdCache[i].fdstate & FD_TEMPORARY) &&
VfdCache[i].fileName != NULL) VfdCache[i].fileName != NULL)
FileClose(i); FileClose(i);
else
VfdCache[i].fdstate &= ~FD_DIRTY;
} }
} }
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.64 2000/02/07 02:38:18 inoue Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.65 2000/04/09 04:43:20 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -48,7 +48,12 @@ ...@@ -48,7 +48,12 @@
typedef struct _MdfdVec typedef struct _MdfdVec
{ {
int mdfd_vfd; /* fd number in vfd pool */ int mdfd_vfd; /* fd number in vfd pool */
uint16 mdfd_flags; /* clean, dirty, free */ int mdfd_flags; /* free, temporary */
/* these are the assigned bits in mdfd_flags: */
#define MDFD_FREE (1 << 0)/* unused entry */
#define MDFD_TEMP (1 << 1)/* close this entry at transaction end */
int mdfd_lstbcnt; /* most recent block count */ int mdfd_lstbcnt; /* most recent block count */
int mdfd_nextFree; /* next free vector */ int mdfd_nextFree; /* next free vector */
#ifndef LET_OS_MANAGE_FILESIZE #ifndef LET_OS_MANAGE_FILESIZE
...@@ -62,13 +67,13 @@ static int Md_Free = -1; /* head of freelist of unused fdvec entries */ ...@@ -62,13 +67,13 @@ static int Md_Free = -1; /* head of freelist of unused fdvec entries */
static int CurFd = 0; /* first never-used fdvec index */ static int CurFd = 0; /* first never-used fdvec index */
static MemoryContext MdCxt; /* context for all my allocations */ static MemoryContext MdCxt; /* context for all my allocations */
#define MDFD_DIRTY (uint16) 0x01
#define MDFD_FREE (uint16) 0x02
/* routines declared here */ /* routines declared here */
static void mdclose_fd(int fd);
static int _mdfd_getrelnfd(Relation reln); static int _mdfd_getrelnfd(Relation reln);
static MdfdVec *_mdfd_openseg(Relation reln, int segno, int oflags); static MdfdVec *_mdfd_openseg(Relation reln, int segno, int oflags);
static MdfdVec *_mdfd_getseg(Relation reln, int blkno); static MdfdVec *_mdfd_getseg(Relation reln, int blkno);
static MdfdVec *_mdfd_blind_getseg(char *dbname, char *relname,
Oid dbid, Oid relid, int blkno);
static int _fdvec_alloc(void); static int _fdvec_alloc(void);
static void _fdvec_free(int); static void _fdvec_free(int);
static BlockNumber _mdnblocks(File file, Size blcksz); static BlockNumber _mdnblocks(File file, Size blcksz);
...@@ -186,6 +191,8 @@ mdcreate(Relation reln) ...@@ -186,6 +191,8 @@ mdcreate(Relation reln)
#endif #endif
Md_fdvec[vfd].mdfd_lstbcnt = 0; Md_fdvec[vfd].mdfd_lstbcnt = 0;
pfree(path);
return vfd; return vfd;
} }
...@@ -290,9 +297,6 @@ mdextend(Relation reln, char *buffer) ...@@ -290,9 +297,6 @@ mdextend(Relation reln, char *buffer)
return SM_FAIL; return SM_FAIL;
} }
/* remember that we did a write, so we can sync at xact commit */
v->mdfd_flags |= MDFD_DIRTY;
/* try to keep the last block count current, though it's just a hint */ /* try to keep the last block count current, though it's just a hint */
#ifndef LET_OS_MANAGE_FILESIZE #ifndef LET_OS_MANAGE_FILESIZE
if ((v->mdfd_lstbcnt = (++nblocks % RELSEG_SIZE)) == 0) if ((v->mdfd_lstbcnt = (++nblocks % RELSEG_SIZE)) == 0)
...@@ -367,6 +371,8 @@ mdopen(Relation reln) ...@@ -367,6 +371,8 @@ mdopen(Relation reln)
#endif #endif
#endif #endif
pfree(path);
return vfd; return vfd;
} }
...@@ -382,13 +388,24 @@ int ...@@ -382,13 +388,24 @@ int
mdclose(Relation reln) mdclose(Relation reln)
{ {
int fd; int fd;
MdfdVec *v;
MemoryContext oldcxt;
fd = RelationGetFile(reln); fd = RelationGetFile(reln);
if (fd < 0) if (fd < 0)
return SM_SUCCESS; /* already closed, so no work */ return SM_SUCCESS; /* already closed, so no work */
mdclose_fd(fd);
reln->rd_fd = -1;
return SM_SUCCESS;
}
static void
mdclose_fd(int fd)
{
MdfdVec *v;
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(MdCxt); oldcxt = MemoryContextSwitchTo(MdCxt);
#ifndef LET_OS_MANAGE_FILESIZE #ifndef LET_OS_MANAGE_FILESIZE
for (v = &Md_fdvec[fd]; v != (MdfdVec *) NULL;) for (v = &Md_fdvec[fd]; v != (MdfdVec *) NULL;)
...@@ -398,17 +415,14 @@ mdclose(Relation reln) ...@@ -398,17 +415,14 @@ mdclose(Relation reln)
/* if not closed already */ /* if not closed already */
if (v->mdfd_vfd >= 0) if (v->mdfd_vfd >= 0)
{ {
/* /*
* We sync the file descriptor so that we don't need to reopen * We sync the file descriptor so that we don't need to reopen
* it at transaction commit to force changes to disk. * it at transaction commit to force changes to disk. (This
* is not really optional, because we are about to forget that
* the file even exists...)
*/ */
FileSync(v->mdfd_vfd); FileSync(v->mdfd_vfd);
FileClose(v->mdfd_vfd); FileClose(v->mdfd_vfd);
/* mark this file descriptor as clean in our private table */
v->mdfd_flags &= ~MDFD_DIRTY;
} }
/* Now free vector */ /* Now free vector */
v = v->mdfd_chain; v = v->mdfd_chain;
...@@ -423,28 +437,20 @@ mdclose(Relation reln) ...@@ -423,28 +437,20 @@ mdclose(Relation reln)
{ {
if (v->mdfd_vfd >= 0) if (v->mdfd_vfd >= 0)
{ {
/* /*
* We sync the file descriptor so that we don't need to reopen * We sync the file descriptor so that we don't need to reopen
* it at transaction commit to force changes to disk. * it at transaction commit to force changes to disk. (This
* is not really optional, because we are about to forget that
* the file even exists...)
*/ */
FileSync(v->mdfd_vfd); FileSync(v->mdfd_vfd);
FileClose(v->mdfd_vfd); FileClose(v->mdfd_vfd);
/* mark this file descriptor as clean in our private table */
v->mdfd_flags &= ~MDFD_DIRTY;
} }
} }
#endif #endif
MemoryContextSwitchTo(oldcxt); MemoryContextSwitchTo(oldcxt);
_fdvec_free(fd); _fdvec_free(fd);
/* be sure to mark relation closed */
reln->rd_fd = -1;
return SM_SUCCESS;
} }
/* /*
...@@ -521,8 +527,6 @@ mdwrite(Relation reln, BlockNumber blocknum, char *buffer) ...@@ -521,8 +527,6 @@ mdwrite(Relation reln, BlockNumber blocknum, char *buffer)
if (FileWrite(v->mdfd_vfd, buffer, BLCKSZ) != BLCKSZ) if (FileWrite(v->mdfd_vfd, buffer, BLCKSZ) != BLCKSZ)
status = SM_FAIL; status = SM_FAIL;
v->mdfd_flags |= MDFD_DIRTY;
return status; return status;
} }
...@@ -560,14 +564,6 @@ mdflush(Relation reln, BlockNumber blocknum, char *buffer) ...@@ -560,14 +564,6 @@ mdflush(Relation reln, BlockNumber blocknum, char *buffer)
|| FileSync(v->mdfd_vfd) < 0) || FileSync(v->mdfd_vfd) < 0)
status = SM_FAIL; status = SM_FAIL;
/*
* By here, the block is written and changes have been forced to
* stable storage. Mark the descriptor as clean until the next write,
* so we don't sync it again unnecessarily at transaction commit.
*/
v->mdfd_flags &= ~MDFD_DIRTY;
return status; return status;
} }
...@@ -575,139 +571,87 @@ mdflush(Relation reln, BlockNumber blocknum, char *buffer) ...@@ -575,139 +571,87 @@ mdflush(Relation reln, BlockNumber blocknum, char *buffer)
* mdblindwrt() -- Write a block to disk blind. * mdblindwrt() -- Write a block to disk blind.
* *
* We have to be able to do this using only the name and OID of * We have to be able to do this using only the name and OID of
* the database and relation in which the block belongs. This * the database and relation in which the block belongs. Otherwise
* is a synchronous write. * this is just like mdwrite().
*/ */
int int
mdblindwrt(char *dbstr, mdblindwrt(char *dbname,
char *relstr, char *relname,
Oid dbid, Oid dbid,
Oid relid, Oid relid,
BlockNumber blkno, BlockNumber blkno,
char *buffer) char *buffer)
{ {
int fd;
int segno;
long seekpos;
int status; int status;
char *path; long seekpos;
MdfdVec *v;
#ifndef LET_OS_MANAGE_FILESIZE
int nchars;
/* be sure we have enough space for the '.segno', if any */
segno = blkno / RELSEG_SIZE;
if (segno > 0)
nchars = 10;
else
nchars = 0;
/* construct the path to the file and open it */ v = _mdfd_blind_getseg(dbname, relname, dbid, relid, blkno);
/* system table? then put in system area... */
if (dbid == (Oid) 0)
{
path = (char *) palloc(strlen(DataDir) + sizeof(NameData) + 2 + nchars);
if (segno == 0)
sprintf(path, "%s/%s", DataDir, relstr);
else
sprintf(path, "%s/%s.%d", DataDir, relstr, segno);
}
/* user table? then put in user database area... */
else if (dbid == MyDatabaseId)
{
path = (char *) palloc(strlen(DatabasePath) + 2 * sizeof(NameData) + 2 + nchars);
if (segno == 0)
sprintf(path, "%s%c%s", DatabasePath, SEP_CHAR, relstr);
else
sprintf(path, "%s%c%s.%d", DatabasePath, SEP_CHAR, relstr, segno);
}
else
/* this is work arround only !!! */
{
char dbpath[MAXPGPATH];
Oid id;
char *tmpPath;
GetRawDatabaseInfo(dbstr, &id, dbpath);
if (id != dbid)
elog(FATAL, "mdblindwrt: oid of db %s is not %u", dbstr, dbid);
tmpPath = ExpandDatabasePath(dbpath);
if (tmpPath == NULL)
elog(FATAL, "mdblindwrt: can't expand path for db %s", dbstr);
path = (char *) palloc(strlen(tmpPath) + 2 * sizeof(NameData) + 2 + nchars);
if (segno == 0)
sprintf(path, "%s%c%s", tmpPath, SEP_CHAR, relstr);
else
sprintf(path, "%s%c%s.%d", tmpPath, SEP_CHAR, relstr, segno);
pfree(tmpPath);
}
#else
/* construct the path to the file and open it */
/* system table? then put in system area... */
if (dbid == (Oid) 0)
{
path = (char *) palloc(strlen(DataDir) + sizeof(NameData) + 2);
sprintf(path, "%s/%s", DataDir, relstr);
}
/* user table? then put in user database area... */
else if (dbid == MyDatabaseId)
{
path = (char *) palloc(strlen(DatabasePath) + 2 * sizeof(NameData) + 2);
sprintf(path, "%s%c%s", DatabasePath, SEP_CHAR, relstr);
}
else
/* this is work arround only !!! */
{
char dbpath[MAXPGPATH];
Oid id;
char *tmpPath;
GetRawDatabaseInfo(dbstr, &id, dbpath);
if (id != dbid)
elog(FATAL, "mdblindwrt: oid of db %s is not %u", dbstr, dbid);
tmpPath = ExpandDatabasePath(dbpath);
if (tmpPath == NULL)
elog(FATAL, "mdblindwrt: can't expand path for db %s", dbstr);
path = (char *) palloc(strlen(tmpPath) + 2 * sizeof(NameData) + 2);
sprintf(path, "%s%c%s", tmpPath, SEP_CHAR, relstr);
pfree(tmpPath);
}
#endif
#ifndef __CYGWIN32__ if (v == NULL)
if ((fd = open(path, O_RDWR, 0600)) < 0)
#else
if ((fd = open(path, O_RDWR | O_BINARY, 0600)) < 0)
#endif
return SM_FAIL; return SM_FAIL;
/* seek to the right spot */
#ifndef LET_OS_MANAGE_FILESIZE #ifndef LET_OS_MANAGE_FILESIZE
seekpos = (long) (BLCKSZ * (blkno % RELSEG_SIZE)); seekpos = (long) (BLCKSZ * (blkno % RELSEG_SIZE));
#ifdef DIAGNOSTIC
if (seekpos >= BLCKSZ * RELSEG_SIZE)
elog(FATAL, "seekpos too big!");
#endif
#else #else
seekpos = (long) (BLCKSZ * (blkno)); seekpos = (long) (BLCKSZ * (blkno));
#endif #endif
if (lseek(fd, seekpos, SEEK_SET) != seekpos) if (FileSeek(v->mdfd_vfd, seekpos, SEEK_SET) != seekpos)
{
close(fd);
return SM_FAIL; return SM_FAIL;
}
status = SM_SUCCESS; status = SM_SUCCESS;
if (FileWrite(v->mdfd_vfd, buffer, BLCKSZ) != BLCKSZ)
/* write and sync the block */
if (write(fd, buffer, BLCKSZ) != BLCKSZ || (pg_fsync(fd) < 0))
status = SM_FAIL; status = SM_FAIL;
if (close(fd) < 0) return status;
status = SM_FAIL; }
pfree(path); /*
* mdmarkdirty() -- Mark the specified block "dirty" (ie, needs fsync).
*
* Returns SM_SUCCESS or SM_FAIL.
*/
int
mdmarkdirty(Relation reln, BlockNumber blkno)
{
MdfdVec *v;
return status; v = _mdfd_getseg(reln, blkno);
FileMarkDirty(v->mdfd_vfd);
return SM_SUCCESS;
}
/*
* mdblindmarkdirty() -- Mark the specified block "dirty" (ie, needs fsync).
*
* We have to be able to do this using only the name and OID of
* the database and relation in which the block belongs. Otherwise
* this is just like mdmarkdirty().
*/
int
mdblindmarkdirty(char *dbname,
char *relname,
Oid dbid,
Oid relid,
BlockNumber blkno)
{
MdfdVec *v;
v = _mdfd_blind_getseg(dbname, relname, dbid, relid, blkno);
if (v == NULL)
return SM_FAIL;
FileMarkDirty(v->mdfd_vfd);
return SM_SUCCESS;
} }
/* /*
...@@ -873,19 +817,26 @@ mdcommit() ...@@ -873,19 +817,26 @@ mdcommit()
for (i = 0; i < CurFd; i++) for (i = 0; i < CurFd; i++)
{ {
v = &Md_fdvec[i];
if (v->mdfd_flags & MDFD_FREE)
continue;
if (v->mdfd_flags & MDFD_TEMP)
{
/* Sync and close the file */
mdclose_fd(i);
}
else
{
/* Sync, but keep the file entry */
#ifndef LET_OS_MANAGE_FILESIZE #ifndef LET_OS_MANAGE_FILESIZE
for (v = &Md_fdvec[i]; v != (MdfdVec *) NULL; v = v->mdfd_chain) for ( ; v != (MdfdVec *) NULL; v = v->mdfd_chain)
#else #else
v = &Md_fdvec[i]; if (v != (MdfdVec *) NULL)
if (v != (MdfdVec *) NULL)
#endif #endif
{
if (v->mdfd_flags & MDFD_DIRTY)
{ {
if (FileSync(v->mdfd_vfd) < 0) if (FileSync(v->mdfd_vfd) < 0)
return SM_FAIL; return SM_FAIL;
v->mdfd_flags &= ~MDFD_DIRTY;
} }
} }
} }
...@@ -908,13 +859,14 @@ mdabort() ...@@ -908,13 +859,14 @@ mdabort()
for (i = 0; i < CurFd; i++) for (i = 0; i < CurFd; i++)
{ {
#ifndef LET_OS_MANAGE_FILESIZE
for (v = &Md_fdvec[i]; v != (MdfdVec *) NULL; v = v->mdfd_chain)
v->mdfd_flags &= ~MDFD_DIRTY;
#else
v = &Md_fdvec[i]; v = &Md_fdvec[i];
v->mdfd_flags &= ~MDFD_DIRTY; if (v->mdfd_flags & MDFD_FREE)
#endif continue;
if (v->mdfd_flags & MDFD_TEMP)
{
/* Close the file */
mdclose_fd(i);
}
} }
return SM_SUCCESS; return SM_SUCCESS;
...@@ -995,7 +947,6 @@ _fdvec_free(int fdvec) ...@@ -995,7 +947,6 @@ _fdvec_free(int fdvec)
Md_fdvec[fdvec].mdfd_nextFree = Md_Free; Md_fdvec[fdvec].mdfd_nextFree = Md_Free;
Md_fdvec[fdvec].mdfd_flags = MDFD_FREE; Md_fdvec[fdvec].mdfd_flags = MDFD_FREE;
Md_Free = fdvec; Md_Free = fdvec;
} }
static MdfdVec * static MdfdVec *
...@@ -1004,19 +955,17 @@ _mdfd_openseg(Relation reln, int segno, int oflags) ...@@ -1004,19 +955,17 @@ _mdfd_openseg(Relation reln, int segno, int oflags)
MemoryContext oldcxt; MemoryContext oldcxt;
MdfdVec *v; MdfdVec *v;
int fd; int fd;
bool dofree;
char *path, char *path,
*fullpath; *fullpath;
/* be sure we have enough space for the '.segno', if any */ /* be sure we have enough space for the '.segno', if any */
path = relpath(RelationGetPhysicalRelationName(reln)); path = relpath(RelationGetPhysicalRelationName(reln));
dofree = false;
if (segno > 0) if (segno > 0)
{ {
dofree = true;
fullpath = (char *) palloc(strlen(path) + 12); fullpath = (char *) palloc(strlen(path) + 12);
sprintf(fullpath, "%s.%d", path, segno); sprintf(fullpath, "%s.%d", path, segno);
pfree(path);
} }
else else
fullpath = path; fullpath = path;
...@@ -1028,8 +977,7 @@ _mdfd_openseg(Relation reln, int segno, int oflags) ...@@ -1028,8 +977,7 @@ _mdfd_openseg(Relation reln, int segno, int oflags)
fd = FileNameOpenFile(fullpath, O_RDWR | O_BINARY | oflags, 0600); fd = FileNameOpenFile(fullpath, O_RDWR | O_BINARY | oflags, 0600);
#endif #endif
if (dofree) pfree(fullpath);
pfree(fullpath);
if (fd < 0) if (fd < 0)
return (MdfdVec *) NULL; return (MdfdVec *) NULL;
...@@ -1109,6 +1057,104 @@ _mdfd_getseg(Relation reln, int blkno) ...@@ -1109,6 +1057,104 @@ _mdfd_getseg(Relation reln, int blkno)
return v; return v;
} }
/* Find the segment of the relation holding the specified block.
* This is the same as _mdfd_getseg() except that we must work
* "blind" with no Relation struct.
*
* NOTE: we have no easy way to tell whether a FD already exists for the
* target relation, so we always make a new one. This should probably
* be improved somehow, but I doubt it's a significant performance issue
* under normal circumstances. The FD is marked to be closed at end of xact
* so that we don't accumulate a lot of dead FDs.
*/
static MdfdVec *
_mdfd_blind_getseg(char *dbname, char *relname, Oid dbid, Oid relid,
int blkno)
{
MdfdVec *v;
char *path;
int fd;
int vfd;
#ifndef LET_OS_MANAGE_FILESIZE
int segno;
int targsegno;
#endif
/* construct the path to the file and open it */
path = relpath_blind(dbname, relname, dbid, relid);
#ifndef __CYGWIN32__
fd = FileNameOpenFile(path, O_RDWR, 0600);
#else
fd = FileNameOpenFile(path, O_RDWR | O_BINARY, 0600);
#endif
if (fd < 0)
return NULL;
vfd = _fdvec_alloc();
if (vfd < 0)
return NULL;
Md_fdvec[vfd].mdfd_vfd = fd;
Md_fdvec[vfd].mdfd_flags = MDFD_TEMP;
Md_fdvec[vfd].mdfd_lstbcnt = _mdnblocks(fd, BLCKSZ);
#ifndef LET_OS_MANAGE_FILESIZE
Md_fdvec[vfd].mdfd_chain = (MdfdVec *) NULL;
#ifdef DIAGNOSTIC
if (Md_fdvec[vfd].mdfd_lstbcnt > RELSEG_SIZE)
elog(FATAL, "segment too big on relopen!");
#endif
targsegno = blkno / RELSEG_SIZE;
for (v = &Md_fdvec[vfd], segno = 1; segno <= targsegno; segno++)
{
char *segpath;
MdfdVec *newv;
MemoryContext oldcxt;
segpath = (char *) palloc(strlen(path) + 12);
sprintf(segpath, "%s.%d", path, segno);
#ifndef __CYGWIN32__
fd = FileNameOpenFile(segpath, O_RDWR | O_CREAT, 0600);
#else
fd = FileNameOpenFile(segpath, O_RDWR | O_BINARY | O_CREAT, 0600);
#endif
pfree(segpath);
if (fd < 0)
return (MdfdVec *) NULL;
/* allocate an mdfdvec entry for it */
oldcxt = MemoryContextSwitchTo(MdCxt);
newv = (MdfdVec *) palloc(sizeof(MdfdVec));
MemoryContextSwitchTo(oldcxt);
/* fill the entry */
newv->mdfd_vfd = fd;
newv->mdfd_flags = MDFD_TEMP;
newv->mdfd_lstbcnt = _mdnblocks(fd, BLCKSZ);
newv->mdfd_chain = (MdfdVec *) NULL;
#ifdef DIAGNOSTIC
if (newv->mdfd_lstbcnt > RELSEG_SIZE)
elog(FATAL, "segment too big on open!");
#endif
v->mdfd_chain = newv;
v = newv;
}
#else
v = &Md_fdvec[vfd];
#endif
pfree(path);
return v;
}
static BlockNumber static BlockNumber
_mdnblocks(File file, Size blcksz) _mdnblocks(File file, Size blcksz)
{ {
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.32 2000/01/26 05:57:05 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.33 2000/04/09 04:43:20 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -23,21 +23,30 @@ static void smgrshutdown(int dummy); ...@@ -23,21 +23,30 @@ static void smgrshutdown(int dummy);
typedef struct f_smgr typedef struct f_smgr
{ {
int (*smgr_init) ();/* may be NULL */ int (*smgr_init) (void); /* may be NULL */
int (*smgr_shutdown) (); /* may be NULL */ int (*smgr_shutdown) (void); /* may be NULL */
int (*smgr_create) (); int (*smgr_create) (Relation reln);
int (*smgr_unlink) (); int (*smgr_unlink) (Relation reln);
int (*smgr_extend) (); int (*smgr_extend) (Relation reln, char *buffer);
int (*smgr_open) (); int (*smgr_open) (Relation reln);
int (*smgr_close) (); int (*smgr_close) (Relation reln);
int (*smgr_read) (); int (*smgr_read) (Relation reln, BlockNumber blocknum,
int (*smgr_write) (); char *buffer);
int (*smgr_flush) (); int (*smgr_write) (Relation reln, BlockNumber blocknum,
int (*smgr_blindwrt) (); char *buffer);
int (*smgr_nblocks) (); int (*smgr_flush) (Relation reln, BlockNumber blocknum,
int (*smgr_truncate) (); char *buffer);
int (*smgr_commit) (); /* may be NULL */ int (*smgr_blindwrt) (char *dbname, char *relname,
int (*smgr_abort) (); /* may be NULL */ Oid dbid, Oid relid,
BlockNumber blkno, char *buffer);
int (*smgr_markdirty) (Relation reln, BlockNumber blkno);
int (*smgr_blindmarkdirty) (char *dbname, char *relname,
Oid dbid, Oid relid,
BlockNumber blkno);
int (*smgr_nblocks) (Relation reln);
int (*smgr_truncate) (Relation reln, int nblocks);
int (*smgr_commit) (void); /* may be NULL */
int (*smgr_abort) (void); /* may be NULL */
} f_smgr; } f_smgr;
/* /*
...@@ -49,14 +58,14 @@ static f_smgr smgrsw[] = { ...@@ -49,14 +58,14 @@ static f_smgr smgrsw[] = {
/* magnetic disk */ /* magnetic disk */
{mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose, {mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
mdread, mdwrite, mdflush, mdblindwrt, mdnblocks, mdtruncate, mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
mdcommit, mdabort}, mdnblocks, mdtruncate, mdcommit, mdabort},
#ifdef STABLE_MEMORY_STORAGE #ifdef STABLE_MEMORY_STORAGE
/* main memory */ /* main memory */
{mminit, mmshutdown, mmcreate, mmunlink, mmextend, mmopen, mmclose, {mminit, mmshutdown, mmcreate, mmunlink, mmextend, mmopen, mmclose,
mmread, mmwrite, mmflush, mmblindwrt, mmnblocks, NULL, mmread, mmwrite, mmflush, mmblindwrt, mmmarkdirty, mmblindmarkdirty,
mmcommit, mmabort}, mmnblocks, NULL, mmcommit, mmabort},
#endif #endif
}; };
...@@ -299,6 +308,7 @@ smgrblindwrt(int16 which, ...@@ -299,6 +308,7 @@ smgrblindwrt(int16 which,
char *relstr; char *relstr;
int status; int status;
/* strdup here is probably redundant */
dbstr = pstrdup(dbname); dbstr = pstrdup(dbname);
relstr = pstrdup(relname); relstr = pstrdup(relname);
...@@ -315,6 +325,67 @@ smgrblindwrt(int16 which, ...@@ -315,6 +325,67 @@ smgrblindwrt(int16 which,
return status; return status;
} }
/*
* smgrmarkdirty() -- Mark a page dirty (needs fsync).
*
* Mark the specified page as needing to be fsync'd before commit.
* Ordinarily, the storage manager will do this implicitly during
* smgrwrite(). However, the buffer manager may discover that some
* other backend has written a buffer that we dirtied in the current
* transaction. In that case, we still need to fsync the file to be
* sure the page is down to disk before we commit.
*/
int
smgrmarkdirty(int16 which,
Relation reln,
BlockNumber blkno)
{
int status;
status = (*(smgrsw[which].smgr_markdirty)) (reln, blkno);
if (status == SM_FAIL)
elog(ERROR, "cannot mark block %d of %s",
blkno, RelationGetRelationName(reln));
return status;
}
/*
* smgrblindmarkdirty() -- Mark a page dirty, "blind".
*
* Just like smgrmarkdirty, except we don't have a reldesc.
*/
int
smgrblindmarkdirty(int16 which,
char *dbname,
char *relname,
Oid dbid,
Oid relid,
BlockNumber blkno)
{
char *dbstr;
char *relstr;
int status;
/* strdup here is probably redundant */
dbstr = pstrdup(dbname);
relstr = pstrdup(relname);
status = (*(smgrsw[which].smgr_blindmarkdirty)) (dbstr, relstr,
dbid, relid,
blkno);
if (status == SM_FAIL)
elog(ERROR, "cannot mark block %d of %s [%s] blind",
blkno, relstr, dbstr);
pfree(dbstr);
pfree(relstr);
return status;
}
/* /*
* smgrnblocks() -- Calculate the number of POSTGRES blocks in the * smgrnblocks() -- Calculate the number of POSTGRES blocks in the
* supplied relation. * supplied relation.
...@@ -378,7 +449,6 @@ smgrcommit() ...@@ -378,7 +449,6 @@ smgrcommit()
return SM_SUCCESS; return SM_SUCCESS;
} }
#ifdef NOT_USED
int int
smgrabort() smgrabort()
{ {
...@@ -396,8 +466,6 @@ smgrabort() ...@@ -396,8 +466,6 @@ smgrabort()
return SM_SUCCESS; return SM_SUCCESS;
} }
#endif
#ifdef NOT_USED #ifdef NOT_USED
bool bool
smgriswo(int16 smgrno) smgriswo(int16 smgrno)
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: catalog.h,v 1.10 2000/01/26 05:57:56 momjian Exp $ * $Id: catalog.h,v 1.11 2000/04/09 04:43:14 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
#include "access/tupdesc.h" #include "access/tupdesc.h"
extern char *relpath(const char *relname); extern char *relpath(const char *relname);
extern char *relpath_blind(const char *dbname, const char *relname,
Oid dbid, Oid relid);
extern bool IsSystemRelationName(const char *relname); extern bool IsSystemRelationName(const char *relname);
extern bool IsSharedSystemRelationName(const char *relname); extern bool IsSharedSystemRelationName(const char *relname);
extern Oid newoid(void); extern Oid newoid(void);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: buf_internals.h,v 1.35 2000/01/26 05:58:32 momjian Exp $ * $Id: buf_internals.h,v 1.36 2000/04/09 04:43:18 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -61,6 +61,16 @@ typedef struct buftag ...@@ -61,6 +61,16 @@ typedef struct buftag
(a)->relId = (xx_reln)->rd_lockInfo.lockRelId \ (a)->relId = (xx_reln)->rd_lockInfo.lockRelId \
) )
/* If we have to write a buffer "blind" (without a relcache entry),
* the BufferTag is not enough information. BufferBlindId carries the
* additional information needed.
*/
typedef struct bufblindid
{
char dbname[NAMEDATALEN]; /* name of db in which buf belongs */
char relname[NAMEDATALEN]; /* name of reln */
} BufferBlindId;
#define BAD_BUFFER_ID(bid) ((bid) < 1 || (bid) > NBuffers) #define BAD_BUFFER_ID(bid) ((bid) < 1 || (bid) > NBuffers)
#define INVALID_DESCRIPTOR (-3) #define INVALID_DESCRIPTOR (-3)
...@@ -98,8 +108,7 @@ typedef struct sbufdesc ...@@ -98,8 +108,7 @@ typedef struct sbufdesc
bool ri_lock; /* read-intent lock */ bool ri_lock; /* read-intent lock */
bool w_lock; /* context exclusively locked */ bool w_lock; /* context exclusively locked */
char sb_dbname[NAMEDATALEN]; /* name of db in which buf belongs */ BufferBlindId blind; /* extra info to support blind write */
char sb_relname[NAMEDATALEN]; /* name of reln */
} BufferDesc; } BufferDesc;
/* /*
...@@ -164,7 +173,9 @@ extern BufferDesc *BufferDescriptors; ...@@ -164,7 +173,9 @@ extern BufferDesc *BufferDescriptors;
extern BufferBlock BufferBlocks; extern BufferBlock BufferBlocks;
extern long *PrivateRefCount; extern long *PrivateRefCount;
extern bits8 *BufferLocks; extern bits8 *BufferLocks;
extern long *CommitInfoNeedsSave; extern BufferTag *BufferTagLastDirtied;
extern BufferBlindId *BufferBlindLastDirtied;
extern bool *BufferDirtiedByMe;
extern SPINLOCK BufMgrLock; extern SPINLOCK BufMgrLock;
/* localbuf.c */ /* localbuf.c */
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: bufmgr.h,v 1.35 2000/03/31 02:43:30 tgl Exp $ * $Id: bufmgr.h,v 1.36 2000/04/09 04:43:18 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -164,7 +164,7 @@ extern int FlushBuffer(Buffer buffer, bool release); ...@@ -164,7 +164,7 @@ extern int FlushBuffer(Buffer buffer, bool release);
extern void InitBufferPool(IPCKey key); extern void InitBufferPool(IPCKey key);
extern void PrintBufferUsage(FILE *statfp); extern void PrintBufferUsage(FILE *statfp);
extern void ResetBufferUsage(void); extern void ResetBufferUsage(void);
extern void ResetBufferPool(void); extern void ResetBufferPool(bool isCommit);
extern int BufferPoolCheckLeak(void); extern int BufferPoolCheckLeak(void);
extern void FlushBufferPool(void); extern void FlushBufferPool(void);
extern BlockNumber BufferGetBlockNumber(Buffer buffer); extern BlockNumber BufferGetBlockNumber(Buffer buffer);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: fd.h,v 1.19 2000/01/26 05:58:32 momjian Exp $ * $Id: fd.h,v 1.20 2000/04/09 04:43:18 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
/* /*
* calls: * calls:
* *
* File {Close, Read, Write, Seek, Tell, Sync} * File {Close, Read, Write, Seek, Tell, MarkDirty, Sync}
* {File Name Open, Allocate, Free} File * {File Name Open, Allocate, Free} File
* *
* These are NOT JUST RENAMINGS OF THE UNIX ROUTINES. * These are NOT JUST RENAMINGS OF THE UNIX ROUTINES.
...@@ -58,6 +58,7 @@ extern int FileWrite(File file, char *buffer, int amount); ...@@ -58,6 +58,7 @@ extern int FileWrite(File file, char *buffer, int amount);
extern long FileSeek(File file, long offset, int whence); extern long FileSeek(File file, long offset, int whence);
extern int FileTruncate(File file, long offset); extern int FileTruncate(File file, long offset);
extern int FileSync(File file); extern int FileSync(File file);
extern void FileMarkDirty(File file);
/* Operations that allow use of regular stdio --- USE WITH CAUTION */ /* Operations that allow use of regular stdio --- USE WITH CAUTION */
extern FILE *AllocateFile(char *name, char *mode); extern FILE *AllocateFile(char *name, char *mode);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: smgr.h,v 1.17 2000/01/26 05:58:33 momjian Exp $ * $Id: smgr.h,v 1.18 2000/04/09 04:43:18 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -29,17 +29,23 @@ extern int smgrunlink(int16 which, Relation reln); ...@@ -29,17 +29,23 @@ extern int smgrunlink(int16 which, Relation reln);
extern int smgrextend(int16 which, Relation reln, char *buffer); extern int smgrextend(int16 which, Relation reln, char *buffer);
extern int smgropen(int16 which, Relation reln); extern int smgropen(int16 which, Relation reln);
extern int smgrclose(int16 which, Relation reln); extern int smgrclose(int16 which, Relation reln);
extern int smgrread(int16 which, Relation reln, BlockNumber blocknum, extern int smgrread(int16 which, Relation reln, BlockNumber blocknum,
char *buffer); char *buffer);
extern int smgrwrite(int16 which, Relation reln, BlockNumber blocknum, extern int smgrwrite(int16 which, Relation reln, BlockNumber blocknum,
char *buffer); char *buffer);
extern int smgrflush(int16 which, Relation reln, BlockNumber blocknum, extern int smgrflush(int16 which, Relation reln, BlockNumber blocknum,
char *buffer); char *buffer);
extern int smgrblindwrt(int16 which, char *dbname, char *relname, Oid dbid, extern int smgrblindwrt(int16 which, char *dbname, char *relname,
Oid relid, BlockNumber blkno, char *buffer); Oid dbid, Oid relid,
BlockNumber blkno, char *buffer);
extern int smgrmarkdirty(int16 which, Relation reln, BlockNumber blkno);
extern int smgrblindmarkdirty(int16 which, char *dbname, char *relname,
Oid dbid, Oid relid,
BlockNumber blkno);
extern int smgrnblocks(int16 which, Relation reln); extern int smgrnblocks(int16 which, Relation reln);
extern int smgrtruncate(int16 which, Relation reln, int nblocks); extern int smgrtruncate(int16 which, Relation reln, int nblocks);
extern int smgrcommit(void); extern int smgrcommit(void);
extern int smgrabort(void);
...@@ -55,8 +61,11 @@ extern int mdclose(Relation reln); ...@@ -55,8 +61,11 @@ extern int mdclose(Relation reln);
extern int mdread(Relation reln, BlockNumber blocknum, char *buffer); extern int mdread(Relation reln, BlockNumber blocknum, char *buffer);
extern int mdwrite(Relation reln, BlockNumber blocknum, char *buffer); extern int mdwrite(Relation reln, BlockNumber blocknum, char *buffer);
extern int mdflush(Relation reln, BlockNumber blocknum, char *buffer); extern int mdflush(Relation reln, BlockNumber blocknum, char *buffer);
extern int mdblindwrt(char *dbstr, char *relstr, Oid dbid, Oid relid, extern int mdblindwrt(char *dbname, char *relname, Oid dbid, Oid relid,
BlockNumber blkno, char *buffer); BlockNumber blkno, char *buffer);
extern int mdmarkdirty(Relation reln, BlockNumber blkno);
extern int mdblindmarkdirty(char *dbname, char *relname, Oid dbid, Oid relid,
BlockNumber blkno);
extern int mdnblocks(Relation reln); extern int mdnblocks(Relation reln);
extern int mdtruncate(Relation reln, int nblocks); extern int mdtruncate(Relation reln, int nblocks);
extern int mdcommit(void); extern int mdcommit(void);
...@@ -66,7 +75,6 @@ extern int mdabort(void); ...@@ -66,7 +75,6 @@ extern int mdabort(void);
extern SPINLOCK MMCacheLock; extern SPINLOCK MMCacheLock;
extern int mminit(void); extern int mminit(void);
extern int mmshutdown(void);
extern int mmcreate(Relation reln); extern int mmcreate(Relation reln);
extern int mmunlink(Relation reln); extern int mmunlink(Relation reln);
extern int mmextend(Relation reln, char *buffer); extern int mmextend(Relation reln, char *buffer);
...@@ -75,11 +83,17 @@ extern int mmclose(Relation reln); ...@@ -75,11 +83,17 @@ extern int mmclose(Relation reln);
extern int mmread(Relation reln, BlockNumber blocknum, char *buffer); extern int mmread(Relation reln, BlockNumber blocknum, char *buffer);
extern int mmwrite(Relation reln, BlockNumber blocknum, char *buffer); extern int mmwrite(Relation reln, BlockNumber blocknum, char *buffer);
extern int mmflush(Relation reln, BlockNumber blocknum, char *buffer); extern int mmflush(Relation reln, BlockNumber blocknum, char *buffer);
extern int mmblindwrt(char *dbstr, char *relstr, Oid dbid, Oid relid, extern int mmblindwrt(char *dbname, char *relname, Oid dbid, Oid relid,
BlockNumber blkno, char *buffer); BlockNumber blkno, char *buffer);
extern int mmmarkdirty(Relation reln, BlockNumber blkno);
extern int mmblindmarkdirty(char *dbname, char *relname, Oid dbid, Oid relid,
BlockNumber blkno);
extern int mmnblocks(Relation reln); extern int mmnblocks(Relation reln);
extern int mmtruncate(Relation reln, int nblocks);
extern int mmcommit(void); extern int mmcommit(void);
extern int mmabort(void); extern int mmabort(void);
extern int mmshutdown(void);
extern int MMShmemSize(void); extern int MMShmemSize(void);
/* smgrtype.c */ /* smgrtype.c */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment