Commit 0a873949 authored by Tom Lane's avatar Tom Lane

Fix the torn-page hazard for PITR base backups by forcing full page writes

to occur between pg_start_backup() and pg_stop_backup(), even if the GUC
setting full_page_writes is OFF.  Per discussion, doing this in combination
with the already-existing checkpoint during pg_start_backup() should ensure
safety against partial page updates being included in the backup.  We do
not have to force full page writes to occur during normal PITR operation,
as I had first feared.
parent 8e7aaeb6
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.235 2006/04/14 20:27:24 tgl Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.236 2006/04/17 18:55:05 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -344,6 +344,7 @@ typedef struct XLogCtlInsert ...@@ -344,6 +344,7 @@ typedef struct XLogCtlInsert
XLogPageHeader currpage; /* points to header of block in cache */ XLogPageHeader currpage; /* points to header of block in cache */
char *currpos; /* current insertion point in cache */ char *currpos; /* current insertion point in cache */
XLogRecPtr RedoRecPtr; /* current redo point for insertions */ XLogRecPtr RedoRecPtr; /* current redo point for insertions */
bool forcePageWrites; /* forcing full-page writes for PITR? */
} XLogCtlInsert; } XLogCtlInsert;
/* /*
...@@ -466,7 +467,7 @@ static void exitArchiveRecovery(TimeLineID endTLI, ...@@ -466,7 +467,7 @@ static void exitArchiveRecovery(TimeLineID endTLI,
uint32 endLogId, uint32 endLogSeg); uint32 endLogId, uint32 endLogSeg);
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis); static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
static bool XLogCheckBuffer(XLogRecData *rdata, static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
XLogRecPtr *lsn, BkpBlock *bkpb); XLogRecPtr *lsn, BkpBlock *bkpb);
static bool AdvanceXLInsertBuffer(void); static bool AdvanceXLInsertBuffer(void);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
...@@ -544,6 +545,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) ...@@ -544,6 +545,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
unsigned i; unsigned i;
XLogwrtRqst LogwrtRqst; XLogwrtRqst LogwrtRqst;
bool updrqst; bool updrqst;
bool doPageWrites;
bool no_tran = (rmid == RM_XLOG_ID) ? true : false; bool no_tran = (rmid == RM_XLOG_ID) ? true : false;
if (info & XLR_INFO_MASK) if (info & XLR_INFO_MASK)
...@@ -591,6 +593,14 @@ begin:; ...@@ -591,6 +593,14 @@ begin:;
dtbuf_bkp[i] = false; dtbuf_bkp[i] = false;
} }
/*
* Decide if we need to do full-page writes in this XLOG record: true if
* full_page_writes is on or we have a PITR request for it. Since we
* don't yet have the insert lock, forcePageWrites could change under us,
* but we'll recheck it once we have the lock.
*/
doPageWrites = fullPageWrites || Insert->forcePageWrites;
INIT_CRC32(rdata_crc); INIT_CRC32(rdata_crc);
len = 0; len = 0;
for (rdt = rdata;;) for (rdt = rdata;;)
...@@ -622,7 +632,8 @@ begin:; ...@@ -622,7 +632,8 @@ begin:;
{ {
/* OK, put it in this slot */ /* OK, put it in this slot */
dtbuf[i] = rdt->buffer; dtbuf[i] = rdt->buffer;
if (XLogCheckBuffer(rdt, &(dtbuf_lsn[i]), &(dtbuf_xlg[i]))) if (XLogCheckBuffer(rdt, doPageWrites,
&(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
{ {
dtbuf_bkp[i] = true; dtbuf_bkp[i] = true;
rdt->data = NULL; rdt->data = NULL;
...@@ -735,12 +746,18 @@ begin:; ...@@ -735,12 +746,18 @@ begin:;
* Check to see if my RedoRecPtr is out of date. If so, may have to go * Check to see if my RedoRecPtr is out of date. If so, may have to go
* back and recompute everything. This can only happen just after a * back and recompute everything. This can only happen just after a
* checkpoint, so it's better to be slow in this case and fast otherwise. * checkpoint, so it's better to be slow in this case and fast otherwise.
*
* If we aren't doing full-page writes then RedoRecPtr doesn't actually
* affect the contents of the XLOG record, so we'll update our local
* copy but not force a recomputation.
*/ */
if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr)) if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
{ {
Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr)); Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
RedoRecPtr = Insert->RedoRecPtr; RedoRecPtr = Insert->RedoRecPtr;
if (doPageWrites)
{
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{ {
if (dtbuf[i] == InvalidBuffer) if (dtbuf[i] == InvalidBuffer)
...@@ -749,8 +766,8 @@ begin:; ...@@ -749,8 +766,8 @@ begin:;
XLByteLE(dtbuf_lsn[i], RedoRecPtr)) XLByteLE(dtbuf_lsn[i], RedoRecPtr))
{ {
/* /*
* Oops, this buffer now needs to be backed up, but we didn't * Oops, this buffer now needs to be backed up, but we
* think so above. Start over. * didn't think so above. Start over.
*/ */
LWLockRelease(WALInsertLock); LWLockRelease(WALInsertLock);
END_CRIT_SECTION(); END_CRIT_SECTION();
...@@ -758,6 +775,21 @@ begin:; ...@@ -758,6 +775,21 @@ begin:;
} }
} }
} }
}
/*
* Also check to see if forcePageWrites was just turned on; if we
* weren't already doing full-page writes then go back and recompute.
* (If it was just turned off, we could recompute the record without
* full pages, but we choose not to bother.)
*/
if (Insert->forcePageWrites && !doPageWrites)
{
/* Oops, must redo it with full-page data */
LWLockRelease(WALInsertLock);
END_CRIT_SECTION();
goto begin;
}
/* /*
* Make additional rdata chain entries for the backup blocks, so that we * Make additional rdata chain entries for the backup blocks, so that we
...@@ -966,7 +998,7 @@ begin:; ...@@ -966,7 +998,7 @@ begin:;
* save the buffer's LSN at *lsn. * save the buffer's LSN at *lsn.
*/ */
static bool static bool
XLogCheckBuffer(XLogRecData *rdata, XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
XLogRecPtr *lsn, BkpBlock *bkpb) XLogRecPtr *lsn, BkpBlock *bkpb)
{ {
PageHeader page; PageHeader page;
...@@ -980,7 +1012,7 @@ XLogCheckBuffer(XLogRecData *rdata, ...@@ -980,7 +1012,7 @@ XLogCheckBuffer(XLogRecData *rdata,
*/ */
*lsn = page->pd_lsn; *lsn = page->pd_lsn;
if (fullPageWrites && if (doPageWrites &&
XLByteLE(page->pd_lsn, RedoRecPtr)) XLByteLE(page->pd_lsn, RedoRecPtr))
{ {
/* /*
...@@ -5651,18 +5683,49 @@ pg_start_backup(PG_FUNCTION_ARGS) ...@@ -5651,18 +5683,49 @@ pg_start_backup(PG_FUNCTION_ARGS)
PointerGetDatum(backupid))); PointerGetDatum(backupid)));
/* /*
* Force a CHECKPOINT. This is not strictly necessary, but it seems like * Mark backup active in shared memory. We must do full-page WAL writes
* a good idea to minimize the amount of past WAL needed to use the * during an on-line backup even if not doing so at other times, because
* backup. Also, this guarantees that two successive backup runs will * it's quite possible for the backup dump to obtain a "torn" (partially
* have different checkpoint positions and hence different history file * written) copy of a database page if it reads the page concurrently
* names, even if nothing happened in between. * with our write to the same page. This can be fixed as long as the
* first write to the page in the WAL sequence is a full-page write.
* Hence, we turn on forcePageWrites and then force a CHECKPOINT, to
* ensure there are no dirty pages in shared memory that might get
* dumped while the backup is in progress without having a corresponding
* WAL record. (Once the backup is complete, we need not force full-page
* writes anymore, since we expect that any pages not modified during
* the backup interval must have been correctly captured by the backup.)
*
* We must hold WALInsertLock to change the value of forcePageWrites,
* to ensure adequate interlocking against XLogInsert().
*/
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
if (XLogCtl->Insert.forcePageWrites)
{
LWLockRelease(WALInsertLock);
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("a backup is already in progress"),
errhint("Run pg_stop_backup() and try again.")));
}
XLogCtl->Insert.forcePageWrites = true;
LWLockRelease(WALInsertLock);
/* Use a TRY block to ensure we release forcePageWrites if fail below */
PG_TRY();
{
/*
* Force a CHECKPOINT. Aside from being necessary to prevent torn
* page problems, this guarantees that two successive backup runs will
* have different checkpoint positions and hence different history
* file names, even if nothing happened in between.
*/ */
RequestCheckpoint(true, false); RequestCheckpoint(true, false);
/* /*
* Now we need to fetch the checkpoint record location, and also its REDO * Now we need to fetch the checkpoint record location, and also its
* pointer. The oldest point in WAL that would be needed to restore * REDO pointer. The oldest point in WAL that would be needed to
* starting from the checkpoint is precisely the REDO pointer. * restore starting from the checkpoint is precisely the REDO pointer.
*/ */
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
checkpointloc = ControlFile->checkPoint; checkpointloc = ControlFile->checkPoint;
...@@ -5673,10 +5736,10 @@ pg_start_backup(PG_FUNCTION_ARGS) ...@@ -5673,10 +5736,10 @@ pg_start_backup(PG_FUNCTION_ARGS)
XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg); XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
/* /*
* We deliberately use strftime/localtime not the src/timezone functions, * We deliberately use strftime/localtime not the src/timezone
* so that backup labels will consistently be recorded in the same * functions, so that backup labels will consistently be recorded in
* timezone regardless of TimeZone setting. This matches elog.c's * the same timezone regardless of TimeZone setting. This matches
* practice. * elog.c's practice.
*/ */
stamp_time = time(NULL); stamp_time = time(NULL);
strftime(strfbuf, sizeof(strfbuf), strftime(strfbuf, sizeof(strfbuf),
...@@ -5684,7 +5747,9 @@ pg_start_backup(PG_FUNCTION_ARGS) ...@@ -5684,7 +5747,9 @@ pg_start_backup(PG_FUNCTION_ARGS)
localtime(&stamp_time)); localtime(&stamp_time));
/* /*
* Check for existing backup label --- implies a backup is already running * Check for existing backup label --- implies a backup is already
* running. (XXX given that we checked forcePageWrites above, maybe
* it would be OK to just unlink any such label file?)
*/ */
if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0) if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
{ {
...@@ -5721,6 +5786,17 @@ pg_start_backup(PG_FUNCTION_ARGS) ...@@ -5721,6 +5786,17 @@ pg_start_backup(PG_FUNCTION_ARGS)
(errcode_for_file_access(), (errcode_for_file_access(),
errmsg("could not write file \"%s\": %m", errmsg("could not write file \"%s\": %m",
BACKUP_LABEL_FILE))); BACKUP_LABEL_FILE)));
}
PG_CATCH();
{
/* Turn off forcePageWrites on failure */
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
XLogCtl->Insert.forcePageWrites = false;
LWLockRelease(WALInsertLock);
PG_RE_THROW();
}
PG_END_TRY();
/* /*
* We're done. As a convenience, return the starting WAL offset. * We're done. As a convenience, return the starting WAL offset.
...@@ -5766,10 +5842,12 @@ pg_stop_backup(PG_FUNCTION_ARGS) ...@@ -5766,10 +5842,12 @@ pg_stop_backup(PG_FUNCTION_ARGS)
/* /*
* Get the current end-of-WAL position; it will be unsafe to use this dump * Get the current end-of-WAL position; it will be unsafe to use this dump
* to restore to a point in advance of this time. * to restore to a point in advance of this time. We can also clear
* forcePageWrites here.
*/ */
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
INSERT_RECPTR(stoppoint, Insert, Insert->curridx); INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
XLogCtl->Insert.forcePageWrites = false;
LWLockRelease(WALInsertLock); LWLockRelease(WALInsertLock);
XLByteToSeg(stoppoint, _logId, _logSeg); XLByteToSeg(stoppoint, _logId, _logSeg);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment