Commit 5a38af7f authored by Tom Lane's avatar Tom Lane

Rearrange XLogFileInit so that control-file spinlock is not held while filling

the new log file with zeroes, only while renaming it into place.  This should
prevent problems with 'stuck spinlock' errors under heavy load.
parent 5dc0e67b
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.59 2001/03/16 05:44:33 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.60 2001/03/17 20:54:13 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -86,7 +86,7 @@ ...@@ -86,7 +86,7 @@
/* Max time to wait to acquire XLog activity locks */ /* Max time to wait to acquire XLog activity locks */
#define XLOG_LOCK_TIMEOUT (5*60*1000000) /* 5 minutes */ #define XLOG_LOCK_TIMEOUT (5*60*1000000) /* 5 minutes */
/* Max time to wait to acquire checkpoint lock */ /* Max time to wait to acquire checkpoint lock */
#define CHECKPOINT_LOCK_TIMEOUT (10*60*1000000) /* 10 minutes */ #define CHECKPOINT_LOCK_TIMEOUT (20*60*1000000) /* 20 minutes */
/* User-settable parameters */ /* User-settable parameters */
int CheckPointSegments = 3; int CheckPointSegments = 3;
...@@ -335,10 +335,6 @@ static ControlFileData *ControlFile = NULL; ...@@ -335,10 +335,6 @@ static ControlFileData *ControlFile = NULL;
snprintf(path, MAXPGPATH, "%s%c%08X%08X", \ snprintf(path, MAXPGPATH, "%s%c%08X%08X", \
XLogDir, SEP_CHAR, log, seg) XLogDir, SEP_CHAR, log, seg)
#define XLogTempFileName(path, log, seg) \
snprintf(path, MAXPGPATH, "%s%cT%08X%08X", \
XLogDir, SEP_CHAR, log, seg)
#define PrevBufIdx(idx) \ #define PrevBufIdx(idx) \
(((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1)) (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
...@@ -401,7 +397,8 @@ static bool InRedo = false; ...@@ -401,7 +397,8 @@ static bool InRedo = false;
static bool AdvanceXLInsertBuffer(void); static bool AdvanceXLInsertBuffer(void);
static void XLogWrite(XLogwrtRqst WriteRqst); static void XLogWrite(XLogwrtRqst WriteRqst);
static int XLogFileInit(uint32 log, uint32 seg, bool *usexistent); static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
static int XLogFileOpen(uint32 log, uint32 seg, bool econt); static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
static void PreallocXlogFiles(XLogRecPtr endptr); static void PreallocXlogFiles(XLogRecPtr endptr);
static void MoveOfflineLogs(uint32 log, uint32 seg); static void MoveOfflineLogs(uint32 log, uint32 seg);
...@@ -960,7 +957,7 @@ XLogWrite(XLogwrtRqst WriteRqst) ...@@ -960,7 +957,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
XLogCtlWrite *Write = &XLogCtl->Write; XLogCtlWrite *Write = &XLogCtl->Write;
char *from; char *from;
bool ispartialpage; bool ispartialpage;
bool usexistent; bool use_existent;
/* Update local LogwrtResult (caller probably did this already, but...) */ /* Update local LogwrtResult (caller probably did this already, but...) */
LogwrtResult = Write->LogwrtResult; LogwrtResult = Write->LogwrtResult;
...@@ -994,12 +991,18 @@ XLogWrite(XLogwrtRqst WriteRqst) ...@@ -994,12 +991,18 @@ XLogWrite(XLogwrtRqst WriteRqst)
} }
XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg); XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
/* create/use new log file; need lock in case creating */ /* create/use new log file */
SpinAcquire(ControlFileLockId); use_existent = true;
usexistent = true; openLogFile = XLogFileInit(openLogId, openLogSeg,
openLogFile = XLogFileInit(openLogId, openLogSeg, &usexistent); &use_existent, true);
openLogOff = 0; openLogOff = 0;
if (!use_existent) /* there was no precreated file */
elog(LOG, "XLogWrite: new log file created - "
"consider increasing WAL_FILES");
/* update pg_control, unless someone else already did */ /* update pg_control, unless someone else already did */
SpinAcquire(ControlFileLockId);
if (ControlFile->logId != openLogId || if (ControlFile->logId != openLogId ||
ControlFile->logSeg != openLogSeg + 1) ControlFile->logSeg != openLogSeg + 1)
{ {
...@@ -1007,28 +1010,23 @@ XLogWrite(XLogwrtRqst WriteRqst) ...@@ -1007,28 +1010,23 @@ XLogWrite(XLogwrtRqst WriteRqst)
ControlFile->logSeg = openLogSeg + 1; ControlFile->logSeg = openLogSeg + 1;
ControlFile->time = time(NULL); ControlFile->time = time(NULL);
UpdateControlFile(); UpdateControlFile();
/*
* Signal postmaster to start a checkpoint if it's been too
* long since the last one. (We look at local copy of
* RedoRecPtr which might be a little out of date, but should
* be close enough for this purpose.)
*/
if (IsUnderPostmaster &&
(openLogId != RedoRecPtr.xlogid ||
openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
(uint32) CheckPointSegments))
{
if (XLOG_DEBUG)
fprintf(stderr, "XLogWrite: time for a checkpoint, signaling postmaster\n");
kill(getppid(), SIGUSR1);
}
} }
SpinRelease(ControlFileLockId); SpinRelease(ControlFileLockId);
if (!usexistent) /* there was no precreated file */
elog(LOG, "XLogWrite: new log file created - "
"consider increasing WAL_FILES");
/*
* Signal postmaster to start a checkpoint if it's been too
* long since the last one. (We look at local copy of RedoRecPtr
* which might be a little out of date, but should be close enough
* for this purpose.)
*/
if (IsUnderPostmaster &&
(openLogId != RedoRecPtr.xlogid ||
openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
(uint32) CheckPointSegments))
{
if (XLOG_DEBUG)
fprintf(stderr, "XLogWrite: time for a checkpoint, signaling postmaster\n");
kill(getppid(), SIGUSR1);
}
} }
if (openLogFile < 0) if (openLogFile < 0)
...@@ -1230,14 +1228,28 @@ XLogFlush(XLogRecPtr record) ...@@ -1230,14 +1228,28 @@ XLogFlush(XLogRecPtr record)
/* /*
* Create a new XLOG file segment, or open a pre-existing one. * Create a new XLOG file segment, or open a pre-existing one.
* *
* log, seg: identify segment to be created/opened.
*
* *use_existent: if TRUE, OK to use a pre-existing file (else, any
* pre-existing file will be deleted). On return, TRUE if a pre-existing
* file was used.
*
* use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
* place. This should be TRUE except during bootstrap log creation. The
* caller must *not* hold the spinlock at call.
*
* Returns FD of opened file. * Returns FD of opened file.
*/ */
static int static int
XLogFileInit(uint32 log, uint32 seg, bool *usexistent) XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock)
{ {
char path[MAXPGPATH]; char path[MAXPGPATH];
char tpath[MAXPGPATH]; char tmppath[MAXPGPATH];
char targpath[MAXPGPATH];
char zbuffer[BLCKSZ]; char zbuffer[BLCKSZ];
uint32 targlog,
targseg;
int fd; int fd;
int nbytes; int nbytes;
...@@ -1246,7 +1258,7 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent) ...@@ -1246,7 +1258,7 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
/* /*
* Try to use existent file (checkpoint maker may have created it already) * Try to use existent file (checkpoint maker may have created it already)
*/ */
if (*usexistent) if (*use_existent)
{ {
fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT, fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
S_IRUSR | S_IWUSR); S_IRUSR | S_IWUSR);
...@@ -1258,20 +1270,24 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent) ...@@ -1258,20 +1270,24 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
} }
else else
return(fd); return(fd);
/* Set flag to tell caller there was no existent file */
*usexistent = false;
} }
XLogTempFileName(tpath, log, seg); /*
unlink(tpath); * Initialize an empty (all zeroes) segment. NOTE: it is possible that
unlink(path); * another process is doing the same thing. If so, we will end up
* pre-creating an extra log segment. That seems OK, and better than
* holding the spinlock throughout this lengthy process.
*/
snprintf(tmppath, MAXPGPATH, "%s%cxlogtemp.%d",
XLogDir, SEP_CHAR, (int) getpid());
unlink(tmppath);
/* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */ /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
fd = BasicOpenFile(tpath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
S_IRUSR | S_IWUSR); S_IRUSR | S_IWUSR);
if (fd < 0) if (fd < 0)
elog(STOP, "InitCreate(logfile %u seg %u) failed: %m", elog(STOP, "InitCreate(%s) failed: %m", tmppath);
log, seg);
/* /*
* Zero-fill the file. We have to do this the hard way to ensure that * Zero-fill the file. We have to do this the hard way to ensure that
...@@ -1290,36 +1306,73 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent) ...@@ -1290,36 +1306,73 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
int save_errno = errno; int save_errno = errno;
/* If we fail to make the file, delete it to release disk space */ /* If we fail to make the file, delete it to release disk space */
unlink(tpath); unlink(tmppath);
errno = save_errno; errno = save_errno;
elog(STOP, "ZeroFill(logfile %u seg %u) failed: %m", elog(STOP, "ZeroFill(%s) failed: %m", tmppath);
log, seg);
} }
} }
if (pg_fsync(fd) != 0) if (pg_fsync(fd) != 0)
elog(STOP, "fsync(logfile %u seg %u) failed: %m", elog(STOP, "fsync(%s) failed: %m", tmppath);
log, seg);
close(fd); close(fd);
/* /*
* Prefer link() to rename() here just to be sure that we don't overwrite * Now move the segment into place with its final name. We want to be
* an existing logfile. However, there shouldn't be one, so rename() * sure that only one process does this at a time.
* is an acceptable substitute except for the truly paranoid. */
if (use_lock)
SpinAcquire(ControlFileLockId);
/*
* If caller didn't want to use a pre-existing file, get rid of any
* pre-existing file. Otherwise, cope with possibility that someone
* else has created the file while we were filling ours: if so, use
* ours to pre-create a future log segment.
*/
targlog = log;
targseg = seg;
strcpy(targpath, path);
if (! *use_existent)
{
unlink(targpath);
}
else
{
while ((fd = BasicOpenFile(targpath, O_RDWR | PG_BINARY,
S_IRUSR | S_IWUSR)) >= 0)
{
close(fd);
NextLogSeg(targlog, targseg);
XLogFileName(targpath, targlog, targseg);
}
}
/*
* Prefer link() to rename() here just to be really sure that we don't
* overwrite an existing logfile. However, there shouldn't be one, so
* rename() is an acceptable substitute except for the truly paranoid.
*/ */
#ifndef __BEOS__ #ifndef __BEOS__
if (link(tpath, path) < 0) if (link(tmppath, targpath) < 0)
elog(STOP, "InitRelink(logfile %u seg %u) failed: %m", elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
log, seg); targlog, targseg);
unlink(tpath); unlink(tmppath);
#else #else
if (rename(tpath, path) < 0) if (rename(tmppath, targpath) < 0)
elog(STOP, "InitRelink(logfile %u seg %u) failed: %m", elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
log, seg); targlog, targseg);
#endif #endif
if (use_lock)
SpinRelease(ControlFileLockId);
/* Set flag to tell caller there was no existent file */
*use_existent = false;
/* Now open original target segment (might not be file I just made) */
fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT, fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
S_IRUSR | S_IWUSR); S_IRUSR | S_IWUSR);
if (fd < 0) if (fd < 0)
...@@ -1367,8 +1420,7 @@ PreallocXlogFiles(XLogRecPtr endptr) ...@@ -1367,8 +1420,7 @@ PreallocXlogFiles(XLogRecPtr endptr)
uint32 _logId; uint32 _logId;
uint32 _logSeg; uint32 _logSeg;
int lf; int lf;
bool usexistent; bool use_existent;
struct timeval delay;
int i; int i;
XLByteToPrevSeg(endptr, _logId, _logSeg); XLByteToPrevSeg(endptr, _logId, _logSeg);
...@@ -1376,30 +1428,19 @@ PreallocXlogFiles(XLogRecPtr endptr) ...@@ -1376,30 +1428,19 @@ PreallocXlogFiles(XLogRecPtr endptr)
{ {
for (i = 1; i <= XLOGfiles; i++) for (i = 1; i <= XLOGfiles; i++)
{ {
usexistent = true;
NextLogSeg(_logId, _logSeg); NextLogSeg(_logId, _logSeg);
SpinAcquire(ControlFileLockId); use_existent = true;
lf = XLogFileInit(_logId, _logSeg, &usexistent); lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
close(lf); close(lf);
SpinRelease(ControlFileLockId);
/*
* Give up ControlFileLockId for 1/50 sec to let other
* backends switch to new log file in XLogWrite()
*/
delay.tv_sec = 0;
delay.tv_usec = 20000;
(void) select(0, NULL, NULL, NULL, &delay);
} }
} }
else if ((endptr.xrecoff - 1) % XLogSegSize >= else if ((endptr.xrecoff - 1) % XLogSegSize >=
(uint32) (0.75 * XLogSegSize)) (uint32) (0.75 * XLogSegSize))
{ {
usexistent = true;
NextLogSeg(_logId, _logSeg); NextLogSeg(_logId, _logSeg);
SpinAcquire(ControlFileLockId); use_existent = true;
lf = XLogFileInit(_logId, _logSeg, &usexistent); lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
close(lf); close(lf);
SpinRelease(ControlFileLockId);
} }
} }
...@@ -2103,7 +2144,7 @@ BootStrapXLOG(void) ...@@ -2103,7 +2144,7 @@ BootStrapXLOG(void)
char *buffer; char *buffer;
XLogPageHeader page; XLogPageHeader page;
XLogRecord *record; XLogRecord *record;
bool usexistent = false; bool use_existent;
crc64 crc; crc64 crc;
/* Use malloc() to ensure buffer is MAXALIGNED */ /* Use malloc() to ensure buffer is MAXALIGNED */
...@@ -2144,7 +2185,8 @@ BootStrapXLOG(void) ...@@ -2144,7 +2185,8 @@ BootStrapXLOG(void)
FIN_CRC64(crc); FIN_CRC64(crc);
record->xl_crc = crc; record->xl_crc = crc;
openLogFile = XLogFileInit(0, 0, &usexistent); use_existent = false;
openLogFile = XLogFileInit(0, 0, &use_existent, false);
if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ) if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
elog(STOP, "BootStrapXLOG failed to write logfile: %m"); elog(STOP, "BootStrapXLOG failed to write logfile: %m");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment