Commit 6d61cdec authored by Tom Lane's avatar Tom Lane

Clean up and document the API for XLogOpenRelation and XLogReadBuffer.

This commit doesn't make much functional change, but it does eliminate some
duplicated code --- for instance, PageIsNew tests are now done inside
XLogReadBuffer rather than by each caller.
The GIST xlog code still needs a lot of love, but I'll worry about that
separately.
parent 2154e1c1
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.11 2006/03/24 04:32:12 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.12 2006/03/29 21:17:36 tgl Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
......@@ -177,9 +177,7 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
decodeEntryUpdateRecord(&xlrec, record);
reln = XLogOpenRelation(xlrec.data->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln, xlrec.data->blkno);
buffer = XLogReadBuffer(reln, xlrec.data->blkno, false);
if (!BufferIsValid(buffer))
elog(PANIC, "block %u unfound", xlrec.data->blkno);
page = (Page) BufferGetPage(buffer);
......@@ -195,8 +193,6 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
}
else
{
if (PageIsNew((PageHeader) page))
elog(PANIC, "uninitialized page %u", xlrec.data->blkno);
if (XLByteLE(lsn, PageGetLSN(page)))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
......@@ -302,17 +298,12 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
decodePageSplitRecord(&xlrec, record);
reln = XLogOpenRelation(xlrec.data->node);
if (!RelationIsValid(reln))
return;
/* first of all wee need get F_LEAF flag from original page */
buffer = XLogReadBuffer(false, reln, xlrec.data->origblkno);
buffer = XLogReadBuffer(reln, xlrec.data->origblkno, false);
if (!BufferIsValid(buffer))
elog(PANIC, "block %u unfound", xlrec.data->origblkno);
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "uninitialized page %u", xlrec.data->origblkno);
flags = (GistPageIsLeaf(page)) ? F_LEAF : 0;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
......@@ -323,7 +314,7 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
NewPage *newpage = xlrec.page + i;
bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false;
buffer = XLogReadBuffer(!isorigpage, reln, newpage->header->blkno);
buffer = XLogReadBuffer(reln, newpage->header->blkno, !isorigpage);
if (!BufferIsValid(buffer))
elog(PANIC, "block %u unfound", newpage->header->blkno);
page = (Page) BufferGetPage(buffer);
......@@ -367,24 +358,15 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
Page page;
reln = XLogOpenRelation(*node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln, GIST_ROOT_BLKNO);
if (!BufferIsValid(buffer))
elog(PANIC, "root block unfound");
buffer = XLogReadBuffer(reln, GIST_ROOT_BLKNO, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
GISTInitBuffer(buffer, F_LEAF);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
}
......@@ -527,12 +509,10 @@ gist_form_invalid_tuple(BlockNumber blkno)
static Buffer
gistXLogReadAndLockBuffer(Relation r, BlockNumber blkno)
{
Buffer buffer = XLogReadBuffer(false, r, blkno);
Buffer buffer = XLogReadBuffer(r, blkno, false);
if (!BufferIsValid(buffer))
elog(PANIC, "block %u unfound", blkno);
if (PageIsNew((PageHeader) (BufferGetPage(buffer))))
elog(PANIC, "uninitialized page %u", blkno);
return buffer;
}
......@@ -590,8 +570,6 @@ gistContinueInsert(gistIncompleteInsert *insert)
Relation index;
index = XLogOpenRelation(insert->node);
if (!RelationIsValid(index))
return;
/*
* needed vector itup never will be more than initial lenblkno+2, because
......@@ -606,29 +584,22 @@ gistContinueInsert(gistIncompleteInsert *insert)
if (insert->origblkno == GIST_ROOT_BLKNO)
{
/*
* it was split root, so we should only make new root. it can't be
* it was split root, so we should only make new root. it can't be
* simple insert into root, look at call pushIncompleteInsert in
* gistRedoPageSplitRecord
*/
Buffer buffer = XLogReadBuffer(true, index, GIST_ROOT_BLKNO);
Buffer buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);
Page page;
if (!BufferIsValid(buffer))
elog(PANIC, "root block unfound");
Assert(BufferIsValid(buffer));
page = BufferGetPage(buffer);
if (XLByteLE(insert->lsn, PageGetLSN(page)))
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
GISTInitBuffer(buffer, 0);
page = BufferGetPage(buffer);
gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
PageSetLSN(page, insert->lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
}
......@@ -654,12 +625,10 @@ gistContinueInsert(gistIncompleteInsert *insert)
childfound = 0;
numbuffer = 1;
buffers[numbuffer - 1] = XLogReadBuffer(false, index, insert->path[i]);
buffers[numbuffer - 1] = XLogReadBuffer(index, insert->path[i], false);
if (!BufferIsValid(buffers[numbuffer - 1]))
elog(PANIC, "block %u unfound", insert->path[i]);
pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]);
if (PageIsNew((PageHeader) (pages[numbuffer - 1])))
elog(PANIC, "uninitialized page %u", insert->path[i]);
if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1])))
{
......@@ -693,7 +662,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
if (gistnospace(pages[numbuffer - 1], itup, lenitup))
{
/* no space left on page, so we should split */
buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true);
if (!BufferIsValid(buffers[numbuffer]))
elog(PANIC, "could not obtain new block");
GISTInitBuffer(buffers[numbuffer], 0);
......@@ -717,7 +686,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
RelationGetRelationName(index));
/* fill new page */
buffers[numbuffer] = XLogReadBuffer(true, index, P_NEW);
buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true);
if (!BufferIsValid(buffers[numbuffer]))
elog(PANIC, "could not obtain new block");
GISTInitBuffer(buffers[numbuffer], 0);
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.209 2006/03/24 04:32:12 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.210 2006/03/29 21:17:36 tgl Exp $
*
*
* INTERFACE ROUTINES
......@@ -2888,16 +2888,10 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
return;
reln = XLogOpenRelation(xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln, xlrec->block);
buffer = XLogReadBuffer(reln, xlrec->block, false);
if (!BufferIsValid(buffer))
elog(PANIC, "heap_clean_redo: no block");
return;
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "heap_clean_redo: uninitialized page");
if (XLByteLE(lsn, PageGetLSN(page)))
{
......@@ -2943,16 +2937,9 @@ heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record)
* Note: the NEWPAGE log record is used for both heaps and indexes, so do
* not do anything that assumes we are touching a heap.
*/
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
reln = XLogOpenRelation(xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln, xlrec->blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "heap_newpage_redo: no block");
buffer = XLogReadBuffer(reln, xlrec->blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
Assert(record->xl_len == SizeOfHeapNewpage + BLCKSZ);
......@@ -2979,18 +2966,12 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
return;
reln = XLogOpenRelation(xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
elog(PANIC, "heap_delete_redo: no block");
return;
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "heap_delete_redo: uninitialized page");
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
......@@ -3045,27 +3026,31 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
reln = XLogOpenRelation(xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page) &&
!(record->xl_info & XLOG_HEAP_INIT_PAGE))
elog(PANIC, "heap_insert_redo: uninitialized page");
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
PageInit(page, BufferGetPageSize(buffer), 0);
{
buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
PageInit(page, BufferGetPageSize(buffer), 0);
}
else
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
}
offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
......@@ -3110,9 +3095,8 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move)
xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
Relation reln = XLogOpenRelation(xlrec->target.node);
Buffer buffer;
bool samepage =
(ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
......@@ -3126,22 +3110,21 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move)
int hsize;
uint32 newlen;
if (!RelationIsValid(reln))
return;
if (record->xl_info & XLR_BKP_BLOCK_1)
{
if (samepage)
return; /* backup block covered both changes */
goto newt;
}
/* Deal with old tuple version */
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
elog(PANIC, "heap_update_redo: no block");
goto newt;
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "heap_update_redo: uninitialized old page");
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
......@@ -3183,6 +3166,10 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move)
/* Set forward chain link in t_ctid */
htup->t_ctid = xlrec->newtid;
}
/*
* this test is ugly, but necessary to avoid thinking that insert change
* is already applied
*/
if (samepage)
goto newsame;
PageSetLSN(page, lsn);
......@@ -3194,32 +3181,38 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move)
newt:;
if ((record->xl_info & XLR_BKP_BLOCK_2) ||
((record->xl_info & XLR_BKP_BLOCK_1) && samepage))
return;
buffer = XLogReadBuffer(true, reln,
ItemPointerGetBlockNumber(&(xlrec->newtid)));
if (!BufferIsValid(buffer))
if (record->xl_info & XLR_BKP_BLOCK_2)
return;
page = (Page) BufferGetPage(buffer);
newsame:;
if (PageIsNew((PageHeader) page) &&
!(record->xl_info & XLOG_HEAP_INIT_PAGE))
elog(PANIC, "heap_update_redo: uninitialized page");
if (record->xl_info & XLOG_HEAP_INIT_PAGE)
PageInit(page, BufferGetPageSize(buffer), 0);
{
buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->newtid)),
true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
PageInit(page, BufferGetPageSize(buffer), 0);
}
else
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->newtid)),
false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
}
newsame:;
offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "heap_update_redo: invalid max offset number");
......@@ -3288,18 +3281,12 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
return;
reln = XLogOpenRelation(xlrec->target.node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)));
buffer = XLogReadBuffer(reln,
ItemPointerGetBlockNumber(&(xlrec->target.tid)),
false);
if (!BufferIsValid(buffer))
elog(PANIC, "heap_lock_redo: no block");
return;
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "heap_lock_redo: uninitialized page");
if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
{
......@@ -3381,7 +3368,10 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec)
{
xl_heap_insert *xlrec = (xl_heap_insert *) rec;
appendStringInfo(buf, "insert: ");
if (xl_info & XLOG_HEAP_INIT_PAGE)
appendStringInfo(buf, "insert(init): ");
else
appendStringInfo(buf, "insert: ");
out_target(buf, &(xlrec->target));
}
else if (info == XLOG_HEAP_DELETE)
......@@ -3391,12 +3381,25 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec)
appendStringInfo(buf, "delete: ");
out_target(buf, &(xlrec->target));
}
else if (info == XLOG_HEAP_UPDATE || info == XLOG_HEAP_MOVE)
else if (info == XLOG_HEAP_UPDATE)
{
xl_heap_update *xlrec = (xl_heap_update *) rec;
if (info == XLOG_HEAP_UPDATE)
if (xl_info & XLOG_HEAP_INIT_PAGE)
appendStringInfo(buf, "update(init): ");
else
appendStringInfo(buf, "update: ");
out_target(buf, &(xlrec->target));
appendStringInfo(buf, "; new %u/%u",
ItemPointerGetBlockNumber(&(xlrec->newtid)),
ItemPointerGetOffsetNumber(&(xlrec->newtid)));
}
else if (info == XLOG_HEAP_MOVE)
{
xl_heap_update *xlrec = (xl_heap_update *) rec;
if (xl_info & XLOG_HEAP_INIT_PAGE)
appendStringInfo(buf, "move(init): ");
else
appendStringInfo(buf, "move: ");
out_target(buf, &(xlrec->target));
......
This diff is collapsed.
$PostgreSQL: pgsql/src/backend/access/transam/README,v 1.3 2005/05/19 21:35:45 tgl Exp $
$PostgreSQL: pgsql/src/backend/access/transam/README,v 1.4 2006/03/29 21:17:37 tgl Exp $
The Transaction System
----------------------
......@@ -252,3 +252,166 @@ slru.c is the supporting mechanism for both pg_clog and pg_subtrans. It
implements the LRU policy for in-memory buffer pages. The high-level routines
for pg_clog are implemented in transam.c, while the low-level functions are in
clog.c. pg_subtrans is contained completely in subtrans.c.
Write-Ahead Log coding
----------------------
The WAL subsystem (also called XLOG in the code) exists to guarantee crash
recovery. It can also be used to provide point-in-time recovery, as well as
hot-standby replication via log shipping. Here are some notes about
non-obvious aspects of its design.
A basic assumption of a write AHEAD log is that log entries must reach stable
storage before the data-page changes they describe. This ensures that
replaying the log to its end will bring us to a consistent state where there
are no partially-performed transactions. To guarantee this, each data page
(either heap or index) is marked with the LSN (log sequence number --- in
practice, a WAL file location) of the latest XLOG record affecting the page.
Before the bufmgr can write out a dirty page, it must ensure that xlog has
been flushed to disk at least up to the page's LSN. This low-level
interaction improves performance by not waiting for XLOG I/O until necessary.
The LSN check exists only in the shared-buffer manager, not in the local
buffer manager used for temp tables; hence operations on temp tables must not
be WAL-logged.
During WAL replay, we can check the LSN of a page to detect whether the change
recorded by the current log entry is already applied (it has been, if the page
LSN is >= the log entry's WAL location).
Usually, log entries contain just enough information to redo a single
incremental update on a page (or small group of pages). This will work only
if the filesystem and hardware implement data page writes as atomic actions,
so that a page is never left in a corrupt partly-written state. Since that's
often an untenable assumption in practice, we log additional information to
allow complete reconstruction of modified pages. The first WAL record
affecting a given page after a checkpoint is made to contain a copy of the
entire page, and we implement replay by restoring that page copy instead of
redoing the update. (This is more reliable than the data storage itself would
be because we can check the validity of the WAL record's CRC.) We can detect
the "first change after checkpoint" by noting whether the page's old LSN
precedes the end of WAL as of the last checkpoint (the RedoRecPtr).
The general schema for executing a WAL-logged action is
1. Pin and exclusive-lock the shared buffer(s) containing the data page(s)
to be modified.
2. START_CRIT_SECTION() (Any error during the next two steps must cause a
PANIC because the shared buffers will contain unlogged changes, which we
have to ensure don't get to disk. Obviously, you should check conditions
such as whether there's enough free space on the page before you start the
critical section.)
3. Apply the required changes to the shared buffer(s).
4. Build a WAL log record and pass it to XLogInsert(); then update the page's
LSN and TLI using the returned XLOG location. For instance,
recptr = XLogInsert(rmgr_id, info, rdata);
PageSetLSN(dp, recptr);
PageSetTLI(dp, ThisTimeLineID);
5. END_CRIT_SECTION()
6. Unlock and write the buffer(s):
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
(Note: WriteBuffer doesn't really "write" the buffer anymore, it just marks it
dirty and unpins it. The write will not happen until a checkpoint occurs or
the shared buffer is needed for another page.)
XLogInsert's "rdata" argument is an array of pointer/size items identifying
chunks of data to be written in the XLOG record, plus optional shared-buffer
IDs for chunks that are in shared buffers rather than temporary variables.
The "rdata" array must mention (at least once) each of the shared buffers
being modified, unless the action is such that the WAL replay routine can
reconstruct the entire page contents. XLogInsert includes the logic that
tests to see whether a shared buffer has been modified since the last
checkpoint. If not, the entire page contents are logged rather than just the
portion(s) pointed to by "rdata".
Because XLogInsert drops the rdata components associated with buffers it
chooses to log in full, the WAL replay routines normally need to test to see
which buffers were handled that way --- otherwise they may be misled about
what the XLOG record actually contains. XLOG records that describe multi-page
changes therefore require some care to design: you must be certain that you
know what data is indicated by each "BKP" bit. An example of the trickiness
is that in a HEAP_UPDATE record, BKP(1) normally is associated with the source
page and BKP(2) is associated with the destination page --- but if these are
the same page, only BKP(1) would have been set.
For this reason as well as the risk of deadlocking on buffer locks, it's best
to design WAL records so that they reflect small atomic actions involving just
one or a few pages. The current XLOG infrastructure cannot handle WAL records
involving references to more than three shared buffers, anyway.
In the case where the WAL record contains enough information to re-generate
the entire contents of a page, do *not* show that page's buffer ID in the
rdata array, even if some of the rdata items point into the buffer. This is
because you don't want XLogInsert to log the whole page contents. The
standard replay-routine pattern for this case is
reln = XLogOpenRelation(rnode);
buffer = XLogReadBuffer(reln, blkno, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
... initialize the page ...
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
In the case where the WAL record provides only enough information to
incrementally update the page, the rdata array *must* mention the buffer
ID at least once; otherwise there is no defense against torn-page problems.
The standard replay-routine pattern for this case is
if (record->xl_info & XLR_BKP_BLOCK_n)
<< do nothing, page was rewritten from logged copy >>;
reln = XLogOpenRelation(rnode);
buffer = XLogReadBuffer(reln, blkno, false);
if (!BufferIsValid(buffer))
<< do nothing, page has been deleted >>;
page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page)))
{
/* changes are already applied */
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
... apply the change ...
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
As noted above, for a multi-page update you need to be able to determine
which XLR_BKP_BLOCK_n flag applies to each page. If a WAL record reflects
a combination of fully-rewritable and incremental updates, then the rewritable
pages don't count for the XLR_BKP_BLOCK_n numbering. (XLR_BKP_BLOCK_n is
associated with the n'th distinct buffer ID seen in the "rdata" array, and
per the above discussion, fully-rewritable buffers shouldn't be mentioned in
"rdata".)
Due to all these constraints, complex changes (such as a multilevel index
insertion) normally need to be described by a series of atomic-action WAL
records. What do you do if the intermediate states are not self-consistent?
The answer is that the WAL replay logic has to be able to fix things up.
In btree indexes, for example, a page split requires insertion of a new key in
the parent btree level, but for locking reasons this has to be reflected by
two separate WAL records. The replay code has to remember "unfinished" split
operations, and match them up to subsequent insertions in the parent level.
If no matching insert has been found by the time the WAL replay ends, the
replay code has to do the insertion on its own to restore the index to
consistency.
......@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.218 2006/03/24 04:32:12 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.219 2006/03/29 21:17:37 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -4097,7 +4097,7 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid)
/* Make sure files supposed to be dropped are dropped */
for (i = 0; i < xlrec->nrels; i++)
{
XLogCloseRelation(xlrec->xnodes[i]);
XLogDropRelation(xlrec->xnodes[i]);
smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
}
}
......@@ -4132,7 +4132,7 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
/* Make sure files supposed to be dropped are dropped */
for (i = 0; i < xlrec->nrels; i++)
{
XLogCloseRelation(xlrec->xnodes[i]);
XLogDropRelation(xlrec->xnodes[i]);
smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
}
}
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.229 2006/03/28 22:01:16 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.230 2006/03/29 21:17:37 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -2509,35 +2509,29 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
blk += sizeof(BkpBlock);
reln = XLogOpenRelation(bkpb.node);
buffer = XLogReadBuffer(reln, bkpb.block, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
if (reln)
if (bkpb.hole_length == 0)
{
buffer = XLogReadBuffer(true, reln, bkpb.block);
if (BufferIsValid(buffer))
{
page = (Page) BufferGetPage(buffer);
if (bkpb.hole_length == 0)
{
memcpy((char *) page, blk, BLCKSZ);
}
else
{
/* must zero-fill the hole */
MemSet((char *) page, 0, BLCKSZ);
memcpy((char *) page, blk, bkpb.hole_offset);
memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
blk + bkpb.hole_offset,
BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
}
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
}
memcpy((char *) page, blk, BLCKSZ);
}
else
{
/* must zero-fill the hole */
MemSet((char *) page, 0, BLCKSZ);
memcpy((char *) page, blk, bkpb.hole_offset);
memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
blk + bkpb.hole_offset,
BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
}
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
blk += BLCKSZ - bkpb.hole_length;
}
}
......@@ -5451,25 +5445,19 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
static void
xlog_outrec(StringInfo buf, XLogRecord *record)
{
int bkpb;
int i;
appendStringInfo(buf, "prev %X/%X; xid %u",
record->xl_prev.xlogid, record->xl_prev.xrecoff,
record->xl_xid);
record->xl_prev.xlogid, record->xl_prev.xrecoff,
record->xl_xid);
for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++)
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
continue;
bkpb++;
if (record->xl_info & XLR_SET_BKP_BLOCK(i))
appendStringInfo(buf, "; bkpb%d", i+1);
}
if (bkpb)
appendStringInfo(buf, "; bkpb %d", bkpb);
appendStringInfo(buf, ": %s",
RmgrTable[record->xl_rmid].rm_name);
appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
}
#endif /* WAL_DEBUG */
......
......@@ -11,7 +11,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.41 2006/03/05 15:58:22 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/transam/xlogutils.c,v 1.42 2006/03/29 21:17:38 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -19,44 +19,81 @@
#include "access/xlogutils.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "storage/smgr.h"
#include "utils/hsearch.h"
/*
* XLogReadBuffer
* Read a page during XLOG replay
*
* Storage related support functions
* This is functionally comparable to ReadBuffer followed by
* LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE): you get back a pinned
* and locked buffer. (The lock is not really necessary, since we
* expect that this is only done during single-process XLOG replay,
* but in some places it simplifies sharing code with the non-XLOG case.)
*
* If "init" is true then the caller intends to rewrite the page fully
* using the info in the XLOG record. In this case we will extend the
* relation if needed to make the page exist, and we will not complain about
* the page being "new" (all zeroes).
*
* If "init" is false then the caller needs the page to be valid already.
* If the page doesn't exist or contains zeroes, we report failure.
*
* If the return value is InvalidBuffer (only possible when init = false),
* the caller should silently skip the update on this page. This currently
* never happens, but we retain it as part of the API spec for possible future
* use.
*/
Buffer
XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno)
XLogReadBuffer(Relation reln, BlockNumber blkno, bool init)
{
BlockNumber lastblock = RelationGetNumberOfBlocks(reln);
Buffer buffer;
if (blkno >= lastblock)
Assert(blkno != P_NEW);
if (blkno < lastblock)
{
/* page exists in file */
buffer = ReadBuffer(reln, blkno);
}
else
{
/* hm, page doesn't exist in file */
if (!init)
elog(PANIC, "block %u of relation %u/%u/%u does not exist",
blkno, reln->rd_node.spcNode,
reln->rd_node.dbNode, reln->rd_node.relNode);
/* OK to extend the file */
/* we do this in recovery only - no rel-extension lock needed */
Assert(InRecovery);
buffer = InvalidBuffer;
if (extend) /* we do this in recovery only - no locks */
while (blkno >= lastblock)
{
Assert(InRecovery);
while (lastblock <= blkno)
{
if (buffer != InvalidBuffer)
ReleaseBuffer(buffer); /* must be WriteBuffer()? */
buffer = ReadBuffer(reln, P_NEW);
lastblock++;
}
if (buffer != InvalidBuffer)
ReleaseBuffer(buffer); /* must be WriteBuffer()? */
buffer = ReadBuffer(reln, P_NEW);
lastblock++;
}
if (buffer != InvalidBuffer)
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
return buffer;
Assert(BufferGetBlockNumber(buffer) == blkno);
}
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
if (!init)
{
/* check that page has been initialized */
Page page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "block %u of relation %u/%u/%u is uninitialized",
blkno, reln->rd_node.spcNode,
reln->rd_node.dbNode, reln->rd_node.relNode);
}
buffer = ReadBuffer(reln, blkno);
if (buffer != InvalidBuffer)
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
return buffer;
}
......@@ -184,6 +221,9 @@ XLogCloseRelationCache(void)
/*
* Open a relation during XLOG replay
*
* Note: this once had an API that allowed NULL return on failure, but it
* no longer does; any failure results in elog().
*/
Relation
XLogOpenRelation(RelFileNode rnode)
......@@ -224,7 +264,7 @@ XLogOpenRelation(RelFileNode rnode)
hash_search(_xlrelcache, (void *) &rnode, HASH_ENTER, &found);
if (found)
elog(PANIC, "XLogOpenRelation: file found on insert into cache");
elog(PANIC, "xlog relation already present on insert into cache");
hentry->rdesc = res;
......@@ -253,7 +293,7 @@ XLogOpenRelation(RelFileNode rnode)
}
/*
* Close a relation during XLOG replay
* Drop a relation during XLOG replay
*
* This is called when the relation is about to be deleted; we need to ensure
* that there is no dangling smgr reference in the xlog relation cache.
......@@ -262,7 +302,7 @@ XLogOpenRelation(RelFileNode rnode)
* cache, we just let it age out normally.
*/
void
XLogCloseRelation(RelFileNode rnode)
XLogDropRelation(RelFileNode rnode)
{
XLogRelDesc *rdesc;
XLogRelCacheEntry *hentry;
......@@ -277,3 +317,25 @@ XLogCloseRelation(RelFileNode rnode)
RelationCloseSmgr(&(rdesc->reldata));
}
/*
* Drop a whole database during XLOG replay
*
* As above, but for DROP DATABASE instead of dropping a single rel
*/
void
XLogDropDatabase(Oid dbid)
{
HASH_SEQ_STATUS status;
XLogRelCacheEntry *hentry;
hash_seq_init(&status, _xlrelcache);
while ((hentry = (XLogRelCacheEntry *) hash_seq_search(&status)) != NULL)
{
XLogRelDesc *rdesc = hentry->rdesc;
if (hentry->rnode.dbNode == dbid)
RelationCloseSmgr(&(rdesc->reldata));
}
}
......@@ -15,7 +15,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.178 2006/03/24 04:32:13 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/dbcommands.c,v 1.179 2006/03/29 21:17:38 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -671,7 +671,7 @@ dropdb(const char *dbname, bool missing_ok)
* is important to ensure that no remaining backend tries to write out a
* dirty buffer to the dead database later...
*/
DropBuffers(db_id);
DropDatabaseBuffers(db_id);
/*
* Also, clean out any entries in the shared free space map.
......@@ -1377,11 +1377,16 @@ dbase_redo(XLogRecPtr lsn, XLogRecord *record)
dst_path = GetDatabasePath(xlrec->db_id, xlrec->tablespace_id);
/*
* Drop pages for this database that are in the shared buffer cache
*/
DropBuffers(xlrec->db_id);
/* Drop pages for this database that are in the shared buffer cache */
DropDatabaseBuffers(xlrec->db_id);
/* Also, clean out any entries in the shared free space map */
FreeSpaceMapForgetDatabase(xlrec->db_id);
/* Clean out the xlog relcache too */
XLogDropDatabase(xlrec->db_id);
/* And remove the physical files */
if (!rmtree(dst_path, true))
ereport(WARNING,
(errmsg("could not remove database directory \"%s\"",
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.130 2006/03/24 04:32:13 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/sequence.c,v 1.131 2006/03/29 21:17:38 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -1140,14 +1140,8 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record)
elog(PANIC, "seq_redo: unknown op code %u", info);
reln = XLogOpenRelation(xlrec->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(true, reln, 0);
if (!BufferIsValid(buffer))
elog(PANIC, "seq_redo: can't read block 0 of rel %u/%u/%u",
xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode);
buffer = XLogReadBuffer(reln, 0, true);
Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer);
/* Always reinit the page and reinstall the magic number */
......
......@@ -37,7 +37,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/tablespace.c,v 1.33 2006/03/29 15:15:43 tgl Exp $
* $PostgreSQL: pgsql/src/backend/commands/tablespace.c,v 1.34 2006/03/29 21:17:38 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -467,6 +467,12 @@ DropTableSpace(DropTableSpaceStmt *stmt)
(void) XLogInsert(RM_TBLSPC_ID, XLOG_TBLSPC_DROP, rdata);
}
/*
* Note: because we checked that the tablespace was empty, there should
* be no need to worry about flushing shared buffers or free space map
* entries for relations in the tablespace.
*/
/*
* Allow TablespaceCreateDbspace again.
*/
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.204 2006/03/05 15:58:36 momjian Exp $
* $PostgreSQL: pgsql/src/backend/storage/buffer/bufmgr.c,v 1.205 2006/03/29 21:17:39 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -1450,7 +1450,7 @@ DropRelFileNodeBuffers(RelFileNode rnode, bool istemp,
}
/* ---------------------------------------------------------------------
* DropBuffers
* DropDatabaseBuffers
*
* This function removes all the buffers in the buffer cache for a
* particular database. Dirty pages are simply dropped, without
......@@ -1461,7 +1461,7 @@ DropRelFileNodeBuffers(RelFileNode rnode, bool istemp,
* --------------------------------------------------------------------
*/
void
DropBuffers(Oid dbid)
DropDatabaseBuffers(Oid dbid)
{
int i;
volatile BufferDesc *bufHdr;
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.161 2006/03/05 15:58:46 momjian Exp $
* $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.162 2006/03/29 21:17:39 tgl Exp $
*
*
*-------------------------------------------------------------------------
......@@ -166,7 +166,7 @@ ReverifyMyDatabase(const char *name)
* other backend will eventually try to write them and die in
* mdblindwrt. Flush any such pages to forestall trouble.
*/
DropBuffers(MyDatabaseId);
DropDatabaseBuffers(MyDatabaseId);
/* Now I can commit hara-kiri with a clear conscience... */
ereport(FATAL,
(errcode(ERRCODE_UNDEFINED_DATABASE),
......
......@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/xlogutils.h,v 1.19 2006/03/05 15:58:54 momjian Exp $
* $PostgreSQL: pgsql/src/include/access/xlogutils.h,v 1.20 2006/03/29 21:17:39 tgl Exp $
*/
#ifndef XLOG_UTILS_H
#define XLOG_UTILS_H
......@@ -19,8 +19,9 @@ extern void XLogInitRelationCache(void);
extern void XLogCloseRelationCache(void);
extern Relation XLogOpenRelation(RelFileNode rnode);
extern void XLogCloseRelation(RelFileNode rnode);
extern void XLogDropRelation(RelFileNode rnode);
extern void XLogDropDatabase(Oid dbid);
extern Buffer XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno);
extern Buffer XLogReadBuffer(Relation reln, BlockNumber blkno, bool init);
#endif
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/storage/bufmgr.h,v 1.98 2006/03/05 15:58:59 momjian Exp $
* $PostgreSQL: pgsql/src/include/storage/bufmgr.h,v 1.99 2006/03/29 21:17:39 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -135,7 +135,7 @@ extern void RelationTruncate(Relation rel, BlockNumber nblocks);
extern void FlushRelationBuffers(Relation rel);
extern void DropRelFileNodeBuffers(RelFileNode rnode, bool istemp,
BlockNumber firstDelBlock);
extern void DropBuffers(Oid dbid);
extern void DropDatabaseBuffers(Oid dbid);
#ifdef NOT_USED
extern void PrintPinnedBufs(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment