Commit 7d4d5c00 authored by Tom Lane's avatar Tom Lane

Arrange to recycle old XLOG log segment files as new segment files,

rather than deleting them only to have to create more.  Steady state
is 2*CHECKPOINT_SEGMENTS + WAL_FILES + 1 segment files, which will
simply be renamed rather than constantly deleted and recreated.
To make this safe, added current XLOG file/offset number to page
header of XLOG pages, so that an un-overwritten page from an old
incarnation of a logfile can be reliably told from a valid page.
This change means that if you try to restart postmaster in a CVS-tip
database after installing the change, you'll get a complaint about
bad XLOG page magic number.  If you don't want to initdb, run
contrib/pg_resetxlog (and be sure you shut down the old postmaster
cleanly).
parent 8b77efdd
......@@ -23,7 +23,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Header: /cvsroot/pgsql/contrib/pg_resetxlog/Attic/pg_resetxlog.c,v 1.5 2001/06/06 17:07:38 tgl Exp $
* $Header: /cvsroot/pgsql/contrib/pg_resetxlog/Attic/pg_resetxlog.c,v 1.6 2001/07/19 02:12:34 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -857,6 +857,10 @@ WriteEmptyXLOG(void)
page->xlp_magic = XLOG_PAGE_MAGIC;
page->xlp_info = 0;
page->xlp_sui = ControlFile.checkPointCopy.ThisStartUpID;
page->xlp_pageaddr.xlogid =
ControlFile.checkPointCopy.redo.xlogid;
page->xlp_pageaddr.xrecoff =
ControlFile.checkPointCopy.redo.xrecoff - SizeOfXLogPHD;
record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
record->xl_prev.xlogid = 0;
record->xl_prev.xrecoff = 0;
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.70 2001/06/21 19:45:45 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.71 2001/07/19 02:12:34 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -92,22 +92,35 @@
/* User-settable parameters */
int CheckPointSegments = 3;
int XLOGbuffers = 8;
int XLOGfiles = 0; /* how many files to pre-allocate during
* ckpt */
int XLOGfiles = 0; /* # of files to preallocate during ckpt */
int XLOG_DEBUG = 0;
char *XLOG_sync_method = NULL;
const char XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
char XLOG_archive_dir[MAXPGPATH]; /* null string means
* delete 'em */
/*
* XLOGfileslop is used in the code as the allowed "fuzz" in the number of
* preallocated XLOG segments --- we try to have at least XLOGfiles advance
* segments but no more than XLOGfiles+XLOGfileslop segments. This could
* be made a separate GUC variable, but at present I think it's sufficient
* to hardwire it as 2*CheckPointSegments+1. Under normal conditions, a
* checkpoint will free no more than 2*CheckPointSegments log segments, and
* we want to recycle all of them; the +1 allows boundary cases to happen
* without wasting a delete/create-segment cycle.
*/
#define XLOGfileslop (2*CheckPointSegments + 1)
/* these are derived from XLOG_sync_method by assign_xlog_sync_method */
static int sync_method = DEFAULT_SYNC_METHOD;
static int open_sync_bit = DEFAULT_SYNC_FLAGBIT;
#define MinXLOGbuffers 4
#define XLOG_SYNC_BIT (enableFsync ? open_sync_bit : 0)
#define MinXLOGbuffers 4
/*
* ThisStartUpID will be same in all backends --- it identifies current
......@@ -405,9 +418,12 @@ static bool AdvanceXLInsertBuffer(void);
static void XLogWrite(XLogwrtRqst WriteRqst);
static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
bool find_free, int max_advance,
bool use_lock);
static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
static void PreallocXlogFiles(XLogRecPtr endptr);
static void MoveOfflineLogs(uint32 log, uint32 seg);
static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
......@@ -856,6 +872,8 @@ AdvanceXLInsertBuffer(void)
bool update_needed = true;
XLogRecPtr OldPageRqstPtr;
XLogwrtRqst WriteRqst;
XLogRecPtr NewPageEndPtr;
XLogPageHeader NewPage;
/* Use Insert->LogwrtResult copy if it's more fresh */
if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
......@@ -930,32 +948,35 @@ AdvanceXLInsertBuffer(void)
* Now the next buffer slot is free and we can set it up to be the
* next output page.
*/
if (XLogCtl->xlblocks[Insert->curridx].xrecoff >= XLogFileSize)
NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
if (NewPageEndPtr.xrecoff >= XLogFileSize)
{
/* crossing a logid boundary */
XLogCtl->xlblocks[nextidx].xlogid =
XLogCtl->xlblocks[Insert->curridx].xlogid + 1;
XLogCtl->xlblocks[nextidx].xrecoff = BLCKSZ;
NewPageEndPtr.xlogid += 1;
NewPageEndPtr.xrecoff = BLCKSZ;
}
else
{
XLogCtl->xlblocks[nextidx].xlogid =
XLogCtl->xlblocks[Insert->curridx].xlogid;
XLogCtl->xlblocks[nextidx].xrecoff =
XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ;
NewPageEndPtr.xrecoff += BLCKSZ;
}
XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
Insert->curridx = nextidx;
Insert->currpage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
Insert->currpos = ((char *) Insert->currpage) + SizeOfXLogPHD;
Insert->currpage = NewPage;
Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD;
/*
* Be sure to re-zero the buffer so that bytes beyond what we've
* written will look like zeroes and not valid XLOG records...
*/
MemSet((char *) Insert->currpage, 0, BLCKSZ);
Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
/* Insert->currpage->xlp_info = 0; *//* done by memset */
Insert->currpage->xlp_sui = ThisStartUpID;
MemSet((char *) NewPage, 0, BLCKSZ);
/* And fill the new page's header */
NewPage->xlp_magic = XLOG_PAGE_MAGIC;
/* NewPage->xlp_info = 0; */ /* done by memset */
NewPage->xlp_sui = ThisStartUpID;
NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
return update_needed;
}
......@@ -1273,10 +1294,7 @@ XLogFileInit(uint32 log, uint32 seg,
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
char targpath[MAXPGPATH];
char zbuffer[BLCKSZ];
uint32 targlog,
targseg;
int fd;
int nbytes;
......@@ -1352,32 +1370,96 @@ XLogFileInit(uint32 log, uint32 seg,
close(fd);
/*
* Now move the segment into place with its final name. We want to be
* sure that only one process does this at a time.
*/
if (use_lock)
SpinAcquire(ControlFileLockId);
/*
* Now move the segment into place with its final name.
*
* If caller didn't want to use a pre-existing file, get rid of any
* pre-existing file. Otherwise, cope with possibility that someone
* else has created the file while we were filling ours: if so, use
* ours to pre-create a future log segment.
*/
targlog = log;
targseg = seg;
strcpy(targpath, path);
if (!InstallXLogFileSegment(log, seg, tmppath,
*use_existent, XLOGfiles + XLOGfileslop,
use_lock))
{
/* No need for any more future segments... */
unlink(tmppath);
}
/* Set flag to tell caller there was no existent file */
*use_existent = false;
/* Now open original target segment (might not be file I just made) */
fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
S_IRUSR | S_IWUSR);
if (fd < 0)
elog(STOP, "open of %s (log file %u, segment %u) failed: %m",
path, log, seg);
return (fd);
}
/*
* Install a new XLOG segment file as a current or future log segment.
*
* This is used both to install a newly-created segment (which has a temp
* filename while it's being created) and to recycle an old segment.
*
* log, seg: identify segment to install as (or first possible target).
*
* tmppath: initial name of file to install. It will be renamed into place.
*
* find_free: if TRUE, install the new segment at the first empty log/seg
* number at or after the passed numbers. If FALSE, install the new segment
* exactly where specified, deleting any existing segment file there.
*
* max_advance: maximum number of log/seg slots to advance past the starting
* point. Fail if no free slot is found in this range. (Irrelevant if
* find_free is FALSE.)
*
* use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
* place. This should be TRUE except during bootstrap log creation. The
* caller must *not* hold the spinlock at call.
*
* Returns TRUE if file installed, FALSE if not installed because of
* exceeding max_advance limit. (Any other kind of failure causes elog().)
*/
static bool
InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
bool find_free, int max_advance,
bool use_lock)
{
char path[MAXPGPATH];
int fd;
XLogFileName(path, log, seg);
/*
* We want to be sure that only one process does this at a time.
*/
if (use_lock)
SpinAcquire(ControlFileLockId);
if (!*use_existent)
unlink(targpath);
if (!find_free)
{
/* Force installation: get rid of any pre-existing segment file */
unlink(path);
}
else
{
while ((fd = BasicOpenFile(targpath, O_RDWR | PG_BINARY,
/* Find a free slot to put it in */
while ((fd = BasicOpenFile(path, O_RDWR | PG_BINARY,
S_IRUSR | S_IWUSR)) >= 0)
{
close(fd);
NextLogSeg(targlog, targseg);
XLogFileName(targpath, targlog, targseg);
if (--max_advance < 0)
{
/* Failed to find a free slot within specified range */
if (use_lock)
SpinRelease(ControlFileLockId);
return false;
}
NextLogSeg(log, seg);
XLogFileName(path, log, seg);
}
}
......@@ -1387,30 +1469,20 @@ XLogFileInit(uint32 log, uint32 seg,
* rename() is an acceptable substitute except for the truly paranoid.
*/
#ifndef __BEOS__
if (link(tmppath, targpath) < 0)
if (link(tmppath, path) < 0)
elog(STOP, "link from %s to %s (initialization of log file %u, segment %u) failed: %m",
tmppath, targpath, targlog, targseg);
tmppath, path, log, seg);
unlink(tmppath);
#else
if (rename(tmppath, targpath) < 0)
if (rename(tmppath, path) < 0)
elog(STOP, "rename from %s to %s (initialization of log file %u, segment %u) failed: %m",
tmppath, targpath targlog, targseg);
tmppath, path, log, seg);
#endif
if (use_lock)
SpinRelease(ControlFileLockId);
/* Set flag to tell caller there was no existent file */
*use_existent = false;
/* Now open original target segment (might not be file I just made) */
fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
S_IRUSR | S_IWUSR);
if (fd < 0)
elog(STOP, "open of %s (log file %u, segment %u) failed: %m",
path, log, seg);
return (fd);
return true;
}
/*
......@@ -1477,20 +1549,26 @@ PreallocXlogFiles(XLogRecPtr endptr)
/*
* Remove or move offline all log files older or equal to passed log/seg#
*
* endptr is current (or recent) end of xlog; this is used to determine
* whether we want to recycle rather than delete no-longer-wanted log files.
*/
static void
MoveOfflineLogs(uint32 log, uint32 seg)
MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
{
uint32 endlogId;
uint32 endlogSeg;
DIR *xldir;
struct dirent *xlde;
char lastoff[32];
char path[MAXPGPATH];
Assert(XLOG_archive_dir[0] == 0); /* not implemented yet */
XLByteToPrevSeg(endptr, endlogId, endlogSeg);
xldir = opendir(XLogDir);
if (xldir == NULL)
elog(STOP, "could not open transaction log directory (%s): %m", XLogDir);
elog(STOP, "could not open transaction log directory (%s): %m",
XLogDir);
sprintf(lastoff, "%08X%08X", log, seg);
......@@ -1501,19 +1579,42 @@ MoveOfflineLogs(uint32 log, uint32 seg)
strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
strcmp(xlde->d_name, lastoff) <= 0)
{
sprintf(path, "%s/%s", XLogDir, xlde->d_name);
if (XLOG_archive_dir[0])
elog(LOG, "archiving transaction log file %s", xlde->d_name);
{
elog(LOG, "archiving transaction log file %s",
xlde->d_name);
elog(NOTICE, "archiving log files is not implemented!");
}
else
elog(LOG, "removing transaction log file %s", xlde->d_name);
sprintf(path, "%s/%s", XLogDir, xlde->d_name);
if (XLOG_archive_dir[0] == 0)
unlink(path);
{
/*
* Before deleting the file, see if it can be recycled as
* a future log segment. We allow recycling segments up to
* XLOGfiles + XLOGfileslop segments beyond the current
* XLOG location.
*/
if (InstallXLogFileSegment(endlogId, endlogSeg, path,
true, XLOGfiles + XLOGfileslop,
true))
{
elog(LOG, "recycled transaction log file %s",
xlde->d_name);
}
else
{
/* No need for any more future segments... */
elog(LOG, "removing transaction log file %s",
xlde->d_name);
unlink(path);
}
}
}
errno = 0;
}
if (errno)
elog(STOP, "could not read transaction log directory (%s): %m", XLogDir);
elog(STOP, "could not read transaction log directory (%s): %m",
XLogDir);
closedir(xldir);
}
......@@ -1866,6 +1967,8 @@ next_record_is_invalid:;
static bool
ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
{
XLogRecPtr recaddr;
if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
{
elog(emode, "ReadRecord: invalid magic number %04X in log file %u, segment %u, offset %u",
......@@ -1878,6 +1981,15 @@ ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
hdr->xlp_info, readId, readSeg, readOff);
return false;
}
recaddr.xlogid = readId;
recaddr.xrecoff = readSeg * XLogSegSize + readOff;
if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
{
elog(emode, "ReadRecord: unexpected pageaddr (%u, %u) in log file %u, segment %u, offset %u",
hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
readId, readSeg, readOff);
return false;
}
/*
* We disbelieve a SUI less than the previous page's SUI, or more than
......@@ -2248,6 +2360,8 @@ BootStrapXLOG(void)
page->xlp_magic = XLOG_PAGE_MAGIC;
page->xlp_info = 0;
page->xlp_sui = checkPoint.ThisStartUpID;
page->xlp_pageaddr.xlogid = 0;
page->xlp_pageaddr.xrecoff = 0;
record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
record->xl_prev.xlogid = 0;
record->xl_prev.xrecoff = 0;
......@@ -2500,23 +2614,29 @@ StartupXLOG(void)
EndOfLog.xrecoff += (BLCKSZ - EndOfLog.xrecoff % BLCKSZ);
if (EndOfLog.xrecoff % BLCKSZ == 0)
{
if (EndOfLog.xrecoff >= XLogFileSize)
XLogRecPtr NewPageEndPtr;
NewPageEndPtr = EndOfLog;
if (NewPageEndPtr.xrecoff >= XLogFileSize)
{
XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid + 1;
XLogCtl->xlblocks[0].xrecoff = BLCKSZ;
/* crossing a logid boundary */
NewPageEndPtr.xlogid += 1;
NewPageEndPtr.xrecoff = BLCKSZ;
}
else
{
XLogCtl->xlblocks[0].xlogid = EndOfLog.xlogid;
XLogCtl->xlblocks[0].xrecoff = EndOfLog.xrecoff + BLCKSZ;
NewPageEndPtr.xrecoff += BLCKSZ;
}
Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
XLogCtl->xlblocks[0] = NewPageEndPtr;
Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
if (InRecovery)
Insert->currpage->xlp_sui = ThisStartUpID;
else
Insert->currpage->xlp_sui = ThisStartUpID + 1;
Insert->currpage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
Insert->currpage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
/* rest of buffer was zeroed in XLOGShmemInit */
Insert->currpos = (char *) Insert->currpage + SizeOfXLogPHD;
}
else
{
......@@ -2916,7 +3036,7 @@ CreateCheckPoint(bool shutdown)
if (_logId || _logSeg)
{
PrevLogSeg(_logId, _logSeg);
MoveOfflineLogs(_logId, _logSeg);
MoveOfflineLogs(_logId, _logSeg, recptr);
}
/*
......
......@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: xlog.h,v 1.23 2001/03/22 04:00:32 momjian Exp $
* $Id: xlog.h,v 1.24 2001/07/19 02:12:35 tgl Exp $
*/
#ifndef XLOG_H
#define XLOG_H
......@@ -109,13 +109,14 @@ typedef struct XLogContRecord
/*
* Each page of XLOG file has a header like this:
*/
#define XLOG_PAGE_MAGIC 0xD058 /* can be used as WAL version indicator */
#define XLOG_PAGE_MAGIC 0xD059 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
uint16 xlp_magic; /* magic value for correctness checks */
uint16 xlp_info; /* flag bits, see below */
StartUpID xlp_sui; /* StartUpID of first record on page */
XLogRecPtr xlp_pageaddr; /* XLOG address of this page */
} XLogPageHeaderData;
#define SizeOfXLogPHD MAXALIGN(sizeof(XLogPageHeaderData))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment