Commit 1bb25580 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Make standby server continuously retry restoring the next WAL segment with

restore_command, if the connection to the primary server is lost. This
ensures that the standby can recover automatically, if the connection is
lost for a long time and standby falls behind so much that the required
WAL segments have been archived and deleted in the master.

This also makes standby_mode useful without streaming replication; the
server will keep retrying restore_command every few seconds until the
trigger file is found. That's the same basic functionality pg_standby
offers, but without the bells and whistles.

To implement that, refactor the ReadRecord/FetchRecord functions. The
FetchRecord() function introduced in the original streaming replication
patch is removed, and all the retry logic is now in a new function called
XLogReadPage(). XLogReadPage() is now responsible for executing
restore_command, launching walreceiver, and waiting for new WAL to arrive
from primary, as required.

This also changes the life cycle of walreceiver. When launched, it now only
tries to connect to the master once, and exits if the connection fails, or
is lost during streaming for any reason. The startup process detects the
death, and re-launches walreceiver if necessary.
parent ab13d1e9
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.361 2010/01/26 00:07:13 sriggs Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.362 2010/01/27 15:27:50 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -143,16 +143,6 @@ HotStandbyState standbyState = STANDBY_DISABLED; ...@@ -143,16 +143,6 @@ HotStandbyState standbyState = STANDBY_DISABLED;
static XLogRecPtr LastRec; static XLogRecPtr LastRec;
/*
* Are we doing recovery from XLOG stream? If so, we recover without using
* offline XLOG archives even though InArchiveRecovery==true. This flag is
* used only in standby mode.
*/
static bool InStreamingRecovery = false;
/* The current log page is partially-filled, and so needs to be read again? */
static bool needReread = false;
/* /*
* Local copy of SharedRecoveryInProgress variable. True actually means "not * Local copy of SharedRecoveryInProgress variable. True actually means "not
* known, need to check the shared state". * known, need to check the shared state".
...@@ -457,12 +447,16 @@ static uint32 openLogOff = 0; ...@@ -457,12 +447,16 @@ static uint32 openLogOff = 0;
* These variables are used similarly to the ones above, but for reading * These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset * the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which * of the page just read, not the seek position of the FD itself, which
* will be just past that page. * will be just past that page. readLen indicates how much of the current
* page has been read into readBuf.
*/ */
static int readFile = -1; static int readFile = -1;
static uint32 readId = 0; static uint32 readId = 0;
static uint32 readSeg = 0; static uint32 readSeg = 0;
static uint32 readOff = 0; static uint32 readOff = 0;
static uint32 readLen = 0;
/* Is the currently open segment being streamed from primary? */
static bool readStreamed = false;
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */ /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL; static char *readBuf = NULL;
...@@ -474,7 +468,6 @@ static uint32 readRecordBufSize = 0; ...@@ -474,7 +468,6 @@ static uint32 readRecordBufSize = 0;
/* State information for XLOG reading */ /* State information for XLOG reading */
static XLogRecPtr ReadRecPtr; /* start of last record read */ static XLogRecPtr ReadRecPtr; /* start of last record read */
static XLogRecPtr EndRecPtr; /* end+1 of last record read */ static XLogRecPtr EndRecPtr; /* end+1 of last record read */
static XLogRecord *nextRecord = NULL;
static TimeLineID lastPageTLI = 0; static TimeLineID lastPageTLI = 0;
static XLogRecPtr minRecoveryPoint; /* local copy of static XLogRecPtr minRecoveryPoint; /* local copy of
...@@ -516,7 +509,12 @@ static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); ...@@ -516,7 +509,12 @@ static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance, bool find_free, int *max_advance,
bool use_lock); bool use_lock);
static int XLogFileRead(uint32 log, uint32 seg, int emode); static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
bool fromArchive, bool notexistOk);
static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
bool fromArchive);
static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess);
static void XLogFileClose(void); static void XLogFileClose(void);
static bool RestoreArchivedFile(char *path, const char *xlogfname, static bool RestoreArchivedFile(char *path, const char *xlogfname,
const char *recovername, off_t expectedSize); const char *recovername, off_t expectedSize);
...@@ -526,8 +524,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr); ...@@ -526,8 +524,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
static void ValidateXLOGDirectoryStructure(void); static void ValidateXLOGDirectoryStructure(void);
static void CleanupBackupHistory(void); static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode); static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt); static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
static List *readTimeLineHistory(TimeLineID targetTLI); static List *readTimeLineHistory(TimeLineID targetTLI);
...@@ -539,6 +536,7 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI, ...@@ -539,6 +536,7 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
static void WriteControlFile(void); static void WriteControlFile(void);
static void ReadControlFile(void); static void ReadControlFile(void);
static char *str_time(pg_time_t tnow); static char *str_time(pg_time_t tnow);
static bool CheckForStandbyTrigger(void);
#ifdef WAL_DEBUG #ifdef WAL_DEBUG
static void xlog_outrec(StringInfo buf, XLogRecord *record); static void xlog_outrec(StringInfo buf, XLogRecord *record);
...@@ -2586,36 +2584,22 @@ XLogFileOpen(uint32 log, uint32 seg) ...@@ -2586,36 +2584,22 @@ XLogFileOpen(uint32 log, uint32 seg)
/* /*
* Open a logfile segment for reading (during recovery). * Open a logfile segment for reading (during recovery).
*
* If fromArchive is true, the segment is retrieved from archive, otherwise
* it's read from pg_xlog.
*/ */
static int static int
XLogFileRead(uint32 log, uint32 seg, int emode) XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
bool fromArchive, bool notfoundOk)
{ {
char path[MAXPGPATH];
char xlogfname[MAXFNAMELEN]; char xlogfname[MAXFNAMELEN];
char activitymsg[MAXFNAMELEN + 16]; char activitymsg[MAXFNAMELEN + 16];
ListCell *cell; char path[MAXPGPATH];
int fd; int fd;
/*
* Loop looking for a suitable timeline ID: we might need to read any of
* the timelines listed in expectedTLIs.
*
* We expect curFileTLI on entry to be the TLI of the preceding file in
* sequence, or 0 if there was no predecessor. We do not allow curFileTLI
* to go backwards; this prevents us from picking up the wrong file when a
* parent timeline extends to higher segment numbers than the child we
* want to read.
*/
foreach(cell, expectedTLIs)
{
TimeLineID tli = (TimeLineID) lfirst_int(cell);
if (tli < curFileTLI)
break; /* don't bother looking at too-old TLIs */
XLogFileName(xlogfname, tli, log, seg); XLogFileName(xlogfname, tli, log, seg);
if (InArchiveRecovery && !InStreamingRecovery) if (fromArchive)
{ {
/* Report recovery progress in PS display */ /* Report recovery progress in PS display */
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
...@@ -2625,9 +2609,14 @@ XLogFileRead(uint32 log, uint32 seg, int emode) ...@@ -2625,9 +2609,14 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
restoredFromArchive = RestoreArchivedFile(path, xlogfname, restoredFromArchive = RestoreArchivedFile(path, xlogfname,
"RECOVERYXLOG", "RECOVERYXLOG",
XLogSegSize); XLogSegSize);
if (!restoredFromArchive)
return -1;
} }
else else
{
XLogFilePath(path, tli, log, seg); XLogFilePath(path, tli, log, seg);
restoredFromArchive = false;
}
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (fd >= 0) if (fd >= 0)
...@@ -2642,11 +2631,62 @@ XLogFileRead(uint32 log, uint32 seg, int emode) ...@@ -2642,11 +2631,62 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
return fd; return fd;
} }
if (errno != ENOENT) /* unexpected failure? */ if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
ereport(PANIC, ereport(PANIC,
(errcode_for_file_access(), (errcode_for_file_access(),
errmsg("could not open file \"%s\" (log file %u, segment %u): %m", errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
path, log, seg))); path, log, seg)));
return -1;
}
/*
* Open a logfile segment for reading (during recovery).
*
* This version searches for the segment with any TLI listed in expectedTLIs.
* If not in StandbyMode and fromArchive is true, the segment is also
* searched in pg_xlog if not found in archive.
*/
static int
XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
{
char path[MAXPGPATH];
ListCell *cell;
int fd;
/*
* Loop looking for a suitable timeline ID: we might need to read any of
* the timelines listed in expectedTLIs.
*
* We expect curFileTLI on entry to be the TLI of the preceding file in
* sequence, or 0 if there was no predecessor. We do not allow curFileTLI
* to go backwards; this prevents us from picking up the wrong file when a
* parent timeline extends to higher segment numbers than the child we
* want to read.
*/
foreach(cell, expectedTLIs)
{
TimeLineID tli = (TimeLineID) lfirst_int(cell);
if (tli < curFileTLI)
break; /* don't bother looking at too-old TLIs */
fd = XLogFileRead(log, seg, emode, tli, fromArchive, true);
if (fd != -1)
return fd;
/*
* If not in StandbyMode, fall back to searching pg_xlog. In
* StandbyMode we're streaming segments from the primary to pg_xlog,
* and we mustn't confuse the (possibly partial) segments in pg_xlog
* with complete segments ready to be applied. We rather wait for
* the records to arrive through streaming.
*/
if (!StandbyMode && fromArchive)
{
fd = XLogFileRead(log, seg, emode, tli, false, true);
if (fd != -1)
return fd;
}
} }
/* Couldn't find it. For simplicity, complain about front timeline */ /* Couldn't find it. For simplicity, complain about front timeline */
...@@ -3163,7 +3203,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr) ...@@ -3163,7 +3203,7 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
* different filename that can't be confused with regular XLOG * different filename that can't be confused with regular XLOG
* files. * files.
*/ */
if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name)) if (WalRcvInProgress() || XLogArchiveCheckDone(xlde->d_name))
{ {
snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name); snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
...@@ -3473,79 +3513,6 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) ...@@ -3473,79 +3513,6 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
return true; return true;
} }
/*
* Attempt to fetch an XLOG record.
*
* If RecPtr is not NULL, try to fetch a record at that position. Otherwise
* try to fetch a record just after the last one previously read.
*
* In standby mode, if we failed in reading a valid record and are not doing
* recovery from XLOG stream yet, we ignore the failure and start walreceiver
* process to fetch the record from the primary. Otherwise, returns NULL,
* or fails if emode is PANIC. (emode must be either PANIC or LOG.)
*
* If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In
* this case, if we have to start XLOG streaming, we use RedoStartLSN as the
* streaming start position instead of RecPtr.
*
* The record is copied into readRecordBuf, so that on successful return,
* the returned record pointer always points there.
*/
static XLogRecord *
FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
{
if (StandbyMode && !InStreamingRecovery)
{
XLogRecord *record;
XLogRecPtr startlsn;
bool haveNextRecord = (nextRecord != NULL);
/* An invalid record is OK here, so we set emode to DEBUG2 */
record = ReadRecord(RecPtr, DEBUG2);
if (record != NULL)
return record;
/*
* Start XLOG streaming if there is no more valid records available
* in the archive.
*
* We need to calculate the start position of XLOG streaming. If we
* read a record in the middle of a segment which doesn't exist in
* pg_xlog, we use the start of the segment as the start position.
* That prevents a broken segment (i.e., with no records in the
* first half of a segment) from being created by XLOG streaming,
* which might cause trouble later on if the segment is e.g
* archived.
*/
startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr;
if (startlsn.xrecoff % XLogSegSize != 0)
{
char xlogpath[MAXPGPATH];
struct stat stat_buf;
uint32 log;
uint32 seg;
XLByteToSeg(startlsn, log, seg);
XLogFilePath(xlogpath, recoveryTargetTLI, log, seg);
if (stat(xlogpath, &stat_buf) != 0)
startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize;
}
RequestXLogStreaming(startlsn, PrimaryConnInfo);
/* Needs to read the current page again if the next record is in it */
needReread = haveNextRecord;
nextRecord = NULL;
InStreamingRecovery = true;
ereport(LOG,
(errmsg("starting streaming recovery at %X/%X",
startlsn.xlogid, startlsn.xrecoff)));
}
return ReadRecord(RecPtr, emode);
}
/* /*
* Attempt to read an XLOG record. * Attempt to read an XLOG record.
* *
...@@ -3553,13 +3520,13 @@ FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) ...@@ -3553,13 +3520,13 @@ FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
* try to read a record just after the last one previously read. * try to read a record just after the last one previously read.
* *
* If no valid record is available, returns NULL, or fails if emode is PANIC. * If no valid record is available, returns NULL, or fails if emode is PANIC.
* (emode must be either PANIC, LOG or DEBUG2.) * (emode must be either PANIC, LOG)
* *
* The record is copied into readRecordBuf, so that on successful return, * The record is copied into readRecordBuf, so that on successful return,
* the returned record pointer always points there. * the returned record pointer always points there.
*/ */
static XLogRecord * static XLogRecord *
ReadRecord(XLogRecPtr *RecPtr, int emode_arg) ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
{ {
XLogRecord *record; XLogRecord *record;
char *buffer; char *buffer;
...@@ -3567,11 +3534,8 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) ...@@ -3567,11 +3534,8 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
bool randAccess = false; bool randAccess = false;
uint32 len, uint32 len,
total_len; total_len;
uint32 targetPageOff;
uint32 targetRecOff; uint32 targetRecOff;
uint32 pageHeaderSize; uint32 pageHeaderSize;
XLogRecPtr receivedUpto = {0,0};
bool finished;
int emode; int emode;
/* /*
...@@ -3579,7 +3543,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) ...@@ -3579,7 +3543,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
* should never hit the end of WAL because we wait for it to be streamed. * should never hit the end of WAL because we wait for it to be streamed.
* Therefore treat any broken WAL as PANIC, instead of failing over. * Therefore treat any broken WAL as PANIC, instead of failing over.
*/ */
if (InStreamingRecovery) if (StandbyMode)
emode = PANIC; emode = PANIC;
else else
emode = emode_arg; emode = emode_arg;
...@@ -3600,20 +3564,16 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) ...@@ -3600,20 +3564,16 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
if (RecPtr == NULL) if (RecPtr == NULL)
{ {
RecPtr = &tmpRecPtr; RecPtr = &tmpRecPtr;
/* fast case if next record is on same page */
if (nextRecord != NULL)
{
record = nextRecord;
goto got_record;
}
/* /*
* Align old recptr to next page if the current page is filled and * Align recptr to next page if no more records can fit on the
* doesn't need to be read again. * current page.
*/ */
if (!needReread) if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord)
{
NextLogPage(tmpRecPtr); NextLogPage(tmpRecPtr);
/* We will account for page header size below */ /* We will account for page header size below */
}
} }
else else
{ {
...@@ -3633,81 +3593,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) ...@@ -3633,81 +3593,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
randAccess = true; /* allow curFileTLI to go backwards too */ randAccess = true; /* allow curFileTLI to go backwards too */
} }
if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg)) /* Read the page containing the record */
{ if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
close(readFile); return NULL;
readFile = -1;
}
/* Is the target record ready yet? */
if (InStreamingRecovery)
{
receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished);
if (finished)
{
if (emode_arg == PANIC)
ereport(PANIC,
(errmsg("streaming recovery ended")));
else
return NULL;
}
}
XLByteToSeg(*RecPtr, readId, readSeg);
if (readFile < 0)
{
/* Now it's okay to reset curFileTLI if random fetch */
if (randAccess)
curFileTLI = 0;
readFile = XLogFileRead(readId, readSeg, emode);
if (readFile < 0)
goto next_record_is_invalid;
/*
* Whenever switching to a new WAL segment, we read the first page of
* the file and validate its header, even if that's not where the
* target record is. This is so that we can check the additional
* identification info that is present in the first page's "long"
* header.
*/
readOff = 0;
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
ereport(emode,
(errcode_for_file_access(),
errmsg("could not read from log file %u, segment %u, offset %u: %m",
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
}
targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
if (readOff != targetPageOff || needReread)
{
readOff = targetPageOff;
needReread = false;
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
{
ereport(emode,
(errcode_for_file_access(),
errmsg("could not seek in log file %u, segment %u to offset %u: %m",
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
ereport(emode,
(errcode_for_file_access(),
errmsg("could not read from log file %u, segment %u, offset %u: %m",
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
}
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ; targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
if (targetRecOff == 0) if (targetRecOff == 0)
...@@ -3737,8 +3626,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg) ...@@ -3737,8 +3626,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg)
} }
record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ); record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
got_record:;
/* /*
* xl_len == 0 is bad data for everything except XLOG SWITCH, where it is * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
* required. * required.
...@@ -3838,58 +3725,35 @@ got_record:; ...@@ -3838,58 +3725,35 @@ got_record:;
} }
buffer = readRecordBuf; buffer = readRecordBuf;
nextRecord = NULL;
len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ; len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
if (total_len > len) if (total_len > len)
{ {
/* Need to reassemble record */ /* Need to reassemble record */
XLogContRecord *contrecord; XLogContRecord *contrecord;
XLogRecPtr nextpagelsn = *RecPtr; XLogRecPtr pagelsn;
uint32 gotlen = len; uint32 gotlen = len;
/* Initialize pagelsn to the beginning of the page this record is on */
pagelsn = *RecPtr;
pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ;
memcpy(buffer, record, len); memcpy(buffer, record, len);
record = (XLogRecord *) buffer; record = (XLogRecord *) buffer;
buffer += len; buffer += len;
for (;;) for (;;)
{ {
/* Is the next page ready yet? */ /* Calculate pointer to beginning of next page */
if (InStreamingRecovery) pagelsn.xrecoff += XLOG_BLCKSZ;
if (pagelsn.xrecoff >= XLogFileSize)
{ {
if (gotlen != len) (pagelsn.xlogid)++;
nextpagelsn.xrecoff += XLOG_BLCKSZ; pagelsn.xrecoff = 0;
NextLogPage(nextpagelsn);
receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished);
if (finished)
{
if (emode_arg == PANIC)
ereport(PANIC,
(errmsg("streaming recovery ended")));
else
return NULL;
}
} }
/* Wait for the next page to become available */
if (!XLogPageRead(&pagelsn, emode, false, false))
return NULL;
readOff += XLOG_BLCKSZ; /* Check that the continuation record looks valid */
if (readOff >= XLogSegSize)
{
close(readFile);
readFile = -1;
NextLogSeg(readId, readSeg);
readFile = XLogFileRead(readId, readSeg, emode);
if (readFile < 0)
goto next_record_is_invalid;
readOff = 0;
}
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
ereport(emode,
(errcode_for_file_access(),
errmsg("could not read from log file %u, segment %u, offset %u: %m",
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD)) if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
{ {
ereport(emode, ereport(emode,
...@@ -3923,31 +3787,11 @@ got_record:; ...@@ -3923,31 +3787,11 @@ got_record:;
if (!RecordIsValid(record, *RecPtr, emode)) if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid; goto next_record_is_invalid;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
{
nextRecord = (XLogRecord *) ((char *) contrecord +
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
}
EndRecPtr.xlogid = readId; EndRecPtr.xlogid = readId;
EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff + EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
pageHeaderSize + pageHeaderSize +
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len); MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
/*
* Check whether the current page needs to be read again. If there is no
* unread record in the current page (nextRecord == NULL), obviously we
* don't need to reread it. If we're not in streaming recovery mode yet,
* partially-filled page doesn't need to be reread because it is the
* last valid page.
*/
if (nextRecord != NULL && InStreamingRecovery &&
XLByteLE(receivedUpto, EndRecPtr))
{
nextRecord = NULL;
needReread = true;
}
ReadRecPtr = *RecPtr; ReadRecPtr = *RecPtr;
/* needn't worry about XLOG SWITCH, it can't cross page boundaries */ /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
return record; return record;
...@@ -3956,26 +3800,9 @@ got_record:; ...@@ -3956,26 +3800,9 @@ got_record:;
/* Record does not cross a page boundary */ /* Record does not cross a page boundary */
if (!RecordIsValid(record, *RecPtr, emode)) if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid; goto next_record_is_invalid;
if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
MAXALIGN(total_len))
nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
EndRecPtr.xlogid = RecPtr->xlogid; EndRecPtr.xlogid = RecPtr->xlogid;
EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len); EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
/*
* Check whether the current page needs to be read again. If there is no
* unread record in the current page (nextRecord == NULL), obviously we
* don't need to reread it. If we're not in streaming recovery mode yet,
* partially-filled page doesn't need to be reread because it is the last
* valid page.
*/
if (nextRecord != NULL && InStreamingRecovery &&
XLByteLE(receivedUpto, EndRecPtr))
{
nextRecord = NULL;
needReread = true;
}
ReadRecPtr = *RecPtr; ReadRecPtr = *RecPtr;
memcpy(buffer, record, total_len); memcpy(buffer, record, total_len);
...@@ -3987,8 +3814,6 @@ got_record:; ...@@ -3987,8 +3814,6 @@ got_record:;
/* Pretend it extends to end of segment */ /* Pretend it extends to end of segment */
EndRecPtr.xrecoff += XLogSegSize - 1; EndRecPtr.xrecoff += XLogSegSize - 1;
EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize; EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
nextRecord = NULL; /* definitely not on same page */
needReread = false;
/* /*
* Pretend that readBuf contains the last page of the segment. This is * Pretend that readBuf contains the last page of the segment. This is
...@@ -4005,7 +3830,6 @@ next_record_is_invalid:; ...@@ -4005,7 +3830,6 @@ next_record_is_invalid:;
close(readFile); close(readFile);
readFile = -1; readFile = -1;
} }
nextRecord = NULL;
return NULL; return NULL;
} }
...@@ -5730,7 +5554,7 @@ StartupXLOG(void) ...@@ -5730,7 +5554,7 @@ StartupXLOG(void)
(errmsg("checkpoint record is at %X/%X", (errmsg("checkpoint record is at %X/%X",
checkPointLoc.xlogid, checkPointLoc.xrecoff))); checkPointLoc.xlogid, checkPointLoc.xrecoff)));
} }
else if (InStreamingRecovery) else if (StandbyMode)
{ {
/* /*
* The last valid checkpoint record required for a streaming * The last valid checkpoint record required for a streaming
...@@ -5938,12 +5762,12 @@ StartupXLOG(void) ...@@ -5938,12 +5762,12 @@ StartupXLOG(void)
if (XLByteLT(checkPoint.redo, RecPtr)) if (XLByteLT(checkPoint.redo, RecPtr))
{ {
/* back up to find the record */ /* back up to find the record */
record = FetchRecord(&(checkPoint.redo), PANIC, false); record = ReadRecord(&(checkPoint.redo), PANIC, false);
} }
else else
{ {
/* just have to read next record after CheckPoint */ /* just have to read next record after CheckPoint */
record = FetchRecord(NULL, LOG, false); record = ReadRecord(NULL, LOG, false);
} }
if (record != NULL) if (record != NULL)
...@@ -6096,7 +5920,7 @@ StartupXLOG(void) ...@@ -6096,7 +5920,7 @@ StartupXLOG(void)
LastRec = ReadRecPtr; LastRec = ReadRecPtr;
record = FetchRecord(NULL, LOG, false); record = ReadRecord(NULL, LOG, false);
} while (record != NULL && recoveryContinue); } while (record != NULL && recoveryContinue);
/* /*
...@@ -6130,22 +5954,17 @@ StartupXLOG(void) ...@@ -6130,22 +5954,17 @@ StartupXLOG(void)
/* /*
* We are now done reading the xlog from stream. Turn off streaming * We are now done reading the xlog from stream. Turn off streaming
* recovery, and restart fetching the files (which would be required * recovery to force fetching the files (which would be required
* at end of recovery, e.g., timeline history file) from archive. * at end of recovery, e.g., timeline history file) from archive or
* pg_xlog.
*/ */
if (InStreamingRecovery) StandbyMode = false;
{
/* We are no longer in streaming recovery state */
InStreamingRecovery = false;
ereport(LOG,
(errmsg("streaming recovery complete")));
}
/* /*
* Re-fetch the last valid or last applied record, so we can identify the * Re-fetch the last valid or last applied record, so we can identify the
* exact endpoint of what we consider the valid portion of WAL. * exact endpoint of what we consider the valid portion of WAL.
*/ */
record = ReadRecord(&LastRec, PANIC); record = ReadRecord(&LastRec, PANIC, false);
EndOfLog = EndRecPtr; EndOfLog = EndRecPtr;
XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg); XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
...@@ -6515,7 +6334,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt) ...@@ -6515,7 +6334,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
return NULL; return NULL;
} }
record = FetchRecord(&RecPtr, LOG, true); record = ReadRecord(&RecPtr, LOG, true);
if (record == NULL) if (record == NULL)
{ {
...@@ -7461,10 +7280,6 @@ CreateRestartPoint(int flags) ...@@ -7461,10 +7280,6 @@ CreateRestartPoint(int flags)
} }
LWLockRelease(ControlFileLock); LWLockRelease(ControlFileLock);
/* Are we doing recovery from XLOG stream? */
if (!InStreamingRecovery)
InStreamingRecovery = WalRcvInProgress();
/* /*
* Delete old log files (those no longer needed even for previous * Delete old log files (those no longer needed even for previous
* checkpoint/restartpoint) to prevent the disk holding the xlog from * checkpoint/restartpoint) to prevent the disk holding the xlog from
...@@ -7472,7 +7287,7 @@ CreateRestartPoint(int flags) ...@@ -7472,7 +7287,7 @@ CreateRestartPoint(int flags)
* streaming recovery we have to or the disk will eventually fill up from * streaming recovery we have to or the disk will eventually fill up from
* old log files streamed from master. * old log files streamed from master.
*/ */
if (InStreamingRecovery && (_logId || _logSeg)) if (WalRcvInProgress() && (_logId || _logSeg))
{ {
XLogRecPtr endptr; XLogRecPtr endptr;
...@@ -8791,6 +8606,13 @@ HandleStartupProcInterrupts(void) ...@@ -8791,6 +8606,13 @@ HandleStartupProcInterrupts(void)
*/ */
if (shutdown_requested) if (shutdown_requested)
proc_exit(1); proc_exit(1);
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
if (IsUnderPostmaster && !PostmasterIsAlive(true))
exit(1);
} }
/* Main entry point for startup process */ /* Main entry point for startup process */
...@@ -8843,3 +8665,281 @@ StartupProcessMain(void) ...@@ -8843,3 +8665,281 @@ StartupProcessMain(void)
*/ */
proc_exit(0); proc_exit(0);
} }
/*
* Read the XLOG page containing RecPtr into readBuf (if not read already).
* Returns true if successful, false otherwise or fails if emode is PANIC.
*
* This is responsible for restoring files from archive as needed, as well
* as for waiting for the requested WAL record to arrive in standby mode.
*/
static bool
XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess)
{
static XLogRecPtr receivedUpto = {0, 0};
bool switched_segment = false;
uint32 targetPageOff;
uint32 targetRecOff;
uint32 targetId;
uint32 targetSeg;
XLByteToSeg(*RecPtr, targetId, targetSeg);
targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
/* Fast exit if we have read the record in the current buffer already */
if (targetId == readId && targetSeg == readSeg &&
targetPageOff == readOff && targetRecOff < readLen)
return true;
/*
* See if we need to switch to a new segment because the requested record
* is not in the currently open one.
*/
if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
{
close(readFile);
readFile = -1;
}
XLByteToSeg(*RecPtr, readId, readSeg);
/* See if we need to retrieve more data */
if (readFile < 0 ||
(readStreamed && !XLByteLT(*RecPtr, receivedUpto)))
{
if (StandbyMode)
{
bool last_restore_failed = false;
/*
* In standby mode, wait for the requested record to become
* available, either via restore_command succeeding to restore
* the segment, or via walreceiver having streamed the record.
*/
for (;;)
{
if (WalRcvInProgress())
{
/*
* While walreceiver is active, wait for new WAL to
* arrive from primary.
*/
receivedUpto = GetWalRcvWriteRecPtr();
if (XLByteLT(*RecPtr, receivedUpto))
{
/*
* Great, streamed far enough. Open the file if it's
* not open already.
*/
if (readFile < 0)
{
readFile =
XLogFileRead(readId, readSeg, PANIC,
recoveryTargetTLI, false, false);
switched_segment = true;
readStreamed = true;
}
break;
}
if (CheckForStandbyTrigger())
goto next_record_is_invalid;
/*
* When streaming is active, we want to react quickly when
* the next WAL record arrives, so sleep only a bit.
*/
pg_usleep(100000L); /* 100ms */
}
else
{
/*
* Until walreceiver manages to reconnect, poll the
* archive.
*/
if (readFile >= 0)
{
close(readFile);
readFile = -1;
}
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
switched_segment = true;
readStreamed = false;
if (readFile != -1)
{
elog(DEBUG1, "got WAL segment from archive");
break;
}
/*
* If we succeeded restoring some segments from archive
* since the last connection attempt (or we haven't
* tried streaming yet, retry immediately. But if we
* haven't, assume the problem is persistent, so be
* less aggressive.
*/
if (last_restore_failed)
{
/*
* Check to see if the trigger file exists. Note that
* we do this only after failure, so when you create
* the trigger file, we still finish replaying as much
* as we can before failover.
*/
if (CheckForStandbyTrigger())
goto next_record_is_invalid;
pg_usleep(5000000L); /* 5 seconds */
}
last_restore_failed = true;
/*
* Nope, not found in archive. Try to stream it.
*
* If fetching_ckpt is TRUE, RecPtr points to the initial
* checkpoint location. In that case, we use RedoStartLSN
* as the streaming start position instead of RecPtr, so
* that when we later jump backwards to start redo at
* RedoStartLSN, we will have the logs streamed already.
*/
RequestXLogStreaming(fetching_ckpt ? RedoStartLSN : *RecPtr,
PrimaryConnInfo);
}
/*
* This possibly-long loop needs to handle interrupts of startup
* process.
*/
HandleStartupProcInterrupts();
}
}
else
{
/* In archive or crash recovery. */
if (readFile < 0)
{
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
InArchiveRecovery);
switched_segment = true;
readStreamed = false;
if (readFile < 0)
return false;
}
}
}
/*
* At this point, we have the right segment open and we know the
* requested record is in it.
*/
Assert(readFile != -1);
/*
* If the current segment is being streamed from master, calculate
* how much of the current page we have received already. We know the
* requested record has been received, but this is for the benefit
* of future calls, to allow quick exit at the top of this function.
*/
if (readStreamed)
{
if (RecPtr->xlogid != receivedUpto.xlogid ||
(RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
{
readLen = XLOG_BLCKSZ;
}
else
readLen = receivedUpto.xrecoff % XLogSegSize - targetPageOff;
}
else
readLen = XLOG_BLCKSZ;
if (switched_segment && targetPageOff != 0)
{
/*
* Whenever switching to a new WAL segment, we read the first page of
* the file and validate its header, even if that's not where the
* target record is. This is so that we can check the additional
* identification info that is present in the first page's "long"
* header.
*/
readOff = 0;
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
ereport(emode,
(errcode_for_file_access(),
errmsg("could not read from log file %u, segment %u, offset %u: %m",
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
}
/* Read the requested page */
readOff = targetPageOff;
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
{
ereport(emode,
(errcode_for_file_access(),
errmsg("could not seek in log file %u, segment %u to offset %u: %m",
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
ereport(emode,
(errcode_for_file_access(),
errmsg("could not read from log file %u, segment %u, offset %u: %m",
readId, readSeg, readOff)));
goto next_record_is_invalid;
}
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
Assert(targetId == readId);
Assert(targetSeg == readSeg);
Assert(targetPageOff == readOff);
Assert(targetRecOff < readLen);
return true;
next_record_is_invalid:
if (readFile >= 0)
close(readFile);
readFile = -1;
readStreamed = false;
readLen = 0;
return false;
}
/*
* Check to see if the trigger file exists. If it does, request postmaster
* to shut down walreceiver, wait for it to exit, remove the trigger
* file, and return true.
*/
static bool
CheckForStandbyTrigger(void)
{
struct stat stat_buf;
if (TriggerFile == NULL)
return false;
if (stat(TriggerFile, &stat_buf) == 0)
{
ereport(LOG,
(errmsg("trigger file found: %s", TriggerFile)));
ShutdownWalRcv();
unlink(TriggerFile);
return true;
}
return false;
}
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.601 2010/01/15 09:19:02 heikki Exp $ * $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.602 2010/01/27 15:27:50 heikki Exp $
* *
* NOTES * NOTES
* *
...@@ -224,9 +224,6 @@ static int Shutdown = NoShutdown; ...@@ -224,9 +224,6 @@ static int Shutdown = NoShutdown;
static bool FatalError = false; /* T if recovering from backend crash */ static bool FatalError = false; /* T if recovering from backend crash */
static bool RecoveryError = false; /* T if WAL recovery failed */ static bool RecoveryError = false; /* T if WAL recovery failed */
/* If WalReceiverActive is true, restart walreceiver if it dies */
static bool WalReceiverActive = false;
/* /*
* We use a simple state machine to control startup, shutdown, and * We use a simple state machine to control startup, shutdown, and
* crash recovery (which is rather like shutdown followed by startup). * crash recovery (which is rather like shutdown followed by startup).
...@@ -1469,11 +1466,6 @@ ServerLoop(void) ...@@ -1469,11 +1466,6 @@ ServerLoop(void)
if (PgStatPID == 0 && pmState == PM_RUN) if (PgStatPID == 0 && pmState == PM_RUN)
PgStatPID = pgstat_start(); PgStatPID = pgstat_start();
/* If we have lost walreceiver, try to start a new one */
if (WalReceiverPID == 0 && WalReceiverActive &&
(pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT))
WalReceiverPID = StartWalReceiver();
/* If we need to signal the autovacuum launcher, do so now */ /* If we need to signal the autovacuum launcher, do so now */
if (avlauncher_needs_signal) if (avlauncher_needs_signal)
{ {
...@@ -4167,16 +4159,9 @@ sigusr1_handler(SIGNAL_ARGS) ...@@ -4167,16 +4159,9 @@ sigusr1_handler(SIGNAL_ARGS)
WalReceiverPID == 0) WalReceiverPID == 0)
{ {
/* Startup Process wants us to start the walreceiver process. */ /* Startup Process wants us to start the walreceiver process. */
WalReceiverActive = true;
WalReceiverPID = StartWalReceiver(); WalReceiverPID = StartWalReceiver();
} }
if (CheckPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER))
{
/* The walreceiver process doesn't want to be restarted anymore */
WalReceiverActive = false;
}
PG_SETMASK(&UnBlockSig); PG_SETMASK(&UnBlockSig);
errno = save_errno; errno = save_errno;
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -134,8 +134,7 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS); ...@@ -134,8 +134,7 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
static void WalRcvQuickDieHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS);
/* Prototypes for private functions */ /* Prototypes for private functions */
static void InitWalRcv(void); static void WalRcvDie(int code, Datum arg);
static void WalRcvKill(int code, Datum arg);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void); static void XLogWalRcvFlush(void);
...@@ -153,21 +152,57 @@ static struct ...@@ -153,21 +152,57 @@ static struct
void void
WalReceiverMain(void) WalReceiverMain(void)
{ {
sigjmp_buf local_sigjmp_buf;
MemoryContext walrcv_context;
char conninfo[MAXCONNINFO]; char conninfo[MAXCONNINFO];
XLogRecPtr startpoint; XLogRecPtr startpoint;
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv; volatile WalRcvData *walrcv = WalRcv;
/* Load the libpq-specific functions */ /*
load_file("libpqwalreceiver", false); * WalRcv should be set up already (if we are a backend, we inherit
if (walrcv_connect == NULL || walrcv_receive == NULL || * this by fork() or EXEC_BACKEND mechanism from the postmaster).
walrcv_disconnect == NULL) */
elog(ERROR, "libpqwalreceiver didn't initialize correctly"); Assert(walrcv != NULL);
/*
* Mark walreceiver as running in shared memory.
*
* Do this as early as possible, so that if we fail later on, we'll
* set state to STOPPED. If we die before this, the startup process
* will keep waiting for us to start up, until it times out.
*/
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->pid == 0);
switch(walrcv->walRcvState)
{
case WALRCV_STOPPING:
/* If we've already been requested to stop, don't start up. */
walrcv->walRcvState = WALRCV_STOPPED;
/* fall through */
case WALRCV_STOPPED:
SpinLockRelease(&walrcv->mutex);
proc_exit(1);
break;
case WALRCV_STARTING:
/* The usual case */
break;
case WALRCV_RUNNING:
/* Shouldn't happen */
elog(PANIC, "walreceiver still running according to shared memory state");
}
/* Advertise our PID so that the startup process can kill us */
walrcv->pid = MyProcPid;
walrcv->walRcvState = WALRCV_RUNNING;
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
/* Mark walreceiver in progress */ /* Arrange to clean up at walreceiver exit */
InitWalRcv(); on_shmem_exit(WalRcvDie, 0);
/* /*
* If possible, make this process a group leader, so that the postmaster * If possible, make this process a group leader, so that the postmaster
...@@ -200,81 +235,21 @@ WalReceiverMain(void) ...@@ -200,81 +235,21 @@ WalReceiverMain(void)
/* We allow SIGQUIT (quickdie) at all times */ /* We allow SIGQUIT (quickdie) at all times */
sigdelset(&BlockSig, SIGQUIT); sigdelset(&BlockSig, SIGQUIT);
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/* /*
* Create a resource owner to keep track of our resources (not clear that * Create a resource owner to keep track of our resources (not clear that
* we need this, but may as well have one). * we need this, but may as well have one).
*/ */
CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver"); CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
/*
* Create a memory context that we will do all our work in. We do this so
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks.
*/
walrcv_context = AllocSetContextCreate(TopMemoryContext,
"Wal Receiver",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContextSwitchTo(walrcv_context);
/*
* If an exception is encountered, processing resumes here.
*
* This code is heavily based on bgwriter.c, q.v.
*/
if (sigsetjmp(local_sigjmp_buf, 1) != 0)
{
/* Since not using PG_TRY, must reset error stack by hand */
error_context_stack = NULL;
/* Reset WalRcvImmediateInterruptOK */
DisableWalRcvImmediateExit();
/* Prevent interrupts while cleaning up */
HOLD_INTERRUPTS();
/* Report the error to the server log */
EmitErrorReport();
/* Disconnect any previous connection. */
EnableWalRcvImmediateExit();
walrcv_disconnect();
DisableWalRcvImmediateExit();
/*
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
MemoryContextSwitchTo(walrcv_context);
FlushErrorState();
/* Flush any leaked data in the top-level context */
MemoryContextResetAndDeleteChildren(walrcv_context);
/* Now we can allow interrupts again */
RESUME_INTERRUPTS();
/*
* Sleep at least 1 second after any error. A write error is likely
* to be repeated, and we don't want to be filling the error logs as
* fast as we can.
*/
pg_usleep(1000000L);
}
/* We can now handle ereport(ERROR) */
PG_exception_stack = &local_sigjmp_buf;
/* Unblock signals (they were blocked when the postmaster forked us) */ /* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig); PG_SETMASK(&UnBlockSig);
/* Fetch connection information from shared memory */
SpinLockAcquire(&walrcv->mutex);
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
/* Establish the connection to the primary for XLOG streaming */ /* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit(); EnableWalRcvImmediateExit();
walrcv_connect(conninfo, startpoint); walrcv_connect(conninfo, startpoint);
...@@ -330,63 +305,24 @@ WalReceiverMain(void) ...@@ -330,63 +305,24 @@ WalReceiverMain(void)
} }
} }
/* Advertise our pid in shared memory, so that startup process can kill us. */
static void
InitWalRcv(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
/*
* WalRcv should be set up already (if we are a backend, we inherit
* this by fork() or EXEC_BACKEND mechanism from the postmaster).
*/
if (walrcv == NULL)
elog(PANIC, "walreceiver control data uninitialized");
/* If we've already been requested to stop, don't start up */
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->pid == 0);
if (walrcv->walRcvState == WALRCV_STOPPED ||
walrcv->walRcvState == WALRCV_STOPPING)
{
walrcv->walRcvState = WALRCV_STOPPED;
SpinLockRelease(&walrcv->mutex);
proc_exit(1);
}
walrcv->pid = MyProcPid;
SpinLockRelease(&walrcv->mutex);
/* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvKill, 0);
}
/* /*
* Clear our pid from shared memory at exit. * Mark us as STOPPED in shared memory at exit.
*/ */
static void static void
WalRcvKill(int code, Datum arg) WalRcvDie(int code, Datum arg)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv; volatile WalRcvData *walrcv = WalRcv;
bool stopped = false;
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
if (walrcv->walRcvState == WALRCV_STOPPING || Assert(walrcv->walRcvState == WALRCV_RUNNING ||
walrcv->walRcvState == WALRCV_STOPPED) walrcv->walRcvState == WALRCV_STOPPING);
{ walrcv->walRcvState = WALRCV_STOPPED;
walrcv->walRcvState = WALRCV_STOPPED;
stopped = true;
elog(LOG, "walreceiver stopped");
}
walrcv->pid = 0; walrcv->pid = 0;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
/* Terminate the connection gracefully. */
walrcv_disconnect(); walrcv_disconnect();
/* If requested to stop, tell postmaster to not restart us. */
if (stopped)
SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER);
} }
/* SIGHUP: set flag to re-read config file at next convenient time */ /* SIGHUP: set flag to re-read config file at next convenient time */
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $ * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.3 2010/01/27 15:27:51 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/time.h>
#include <time.h>
#include <unistd.h> #include <unistd.h>
#include <signal.h> #include <signal.h>
...@@ -30,8 +32,11 @@ ...@@ -30,8 +32,11 @@
WalRcvData *WalRcv = NULL; WalRcvData *WalRcv = NULL;
static bool CheckForStandbyTrigger(void); /*
static void ShutdownWalRcv(void); * How long to wait for walreceiver to start up after requesting
* postmaster to launch it. In seconds.
*/
#define WALRCV_STARTUP_TIMEOUT 10
/* Report shared memory space needed by WalRcvShmemInit */ /* Report shared memory space needed by WalRcvShmemInit */
Size Size
...@@ -62,7 +67,7 @@ WalRcvShmemInit(void) ...@@ -62,7 +67,7 @@ WalRcvShmemInit(void)
/* Initialize the data structures */ /* Initialize the data structures */
MemSet(WalRcv, 0, WalRcvShmemSize()); MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_NOT_STARTED; WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex); SpinLockInit(&WalRcv->mutex);
} }
...@@ -73,90 +78,51 @@ WalRcvInProgress(void) ...@@ -73,90 +78,51 @@ WalRcvInProgress(void)
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv; volatile WalRcvData *walrcv = WalRcv;
WalRcvState state; WalRcvState state;
pg_time_t startTime;
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
state = walrcv->walRcvState;
SpinLockRelease(&walrcv->mutex);
if (state == WALRCV_RUNNING || state == WALRCV_STOPPING) state = walrcv->walRcvState;
return true; startTime = walrcv->startTime;
else
return false;
}
/*
* Wait for the XLOG record at given position to become available.
*
* 'recptr' indicates the byte position which caller wants to read the
* XLOG record up to. The byte position actually written and flushed
* by walreceiver is returned. It can be higher than the requested
* location, and the caller can safely read up to that point without
* calling WaitNextXLogAvailable() again.
*
* If WAL streaming is ended (because a trigger file is found), *finished
* is set to true and function returns immediately. The returned position
* can be lower than requested in that case.
*
* Called by the startup process during streaming recovery.
*/
XLogRecPtr
WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished)
{
static XLogRecPtr receivedUpto = {0, 0};
*finished = false;
/* Quick exit if already known available */ SpinLockRelease(&walrcv->mutex);
if (XLByteLT(recptr, receivedUpto))
return receivedUpto;
for (;;) /*
* If it has taken too long for walreceiver to start up, give up.
* Setting the state to STOPPED ensures that if walreceiver later
* does start up after all, it will see that it's not supposed to be
* running and die without doing anything.
*/
if (state == WALRCV_STARTING)
{ {
/* use volatile pointer to prevent code rearrangement */ pg_time_t now = (pg_time_t) time(NULL);
volatile WalRcvData *walrcv = WalRcv;
/* Update local status */
SpinLockAcquire(&walrcv->mutex);
receivedUpto = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
/* If available already, leave here */ if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
if (XLByteLT(recptr, receivedUpto))
return receivedUpto;
/* Check to see if the trigger file exists */
if (CheckForStandbyTrigger())
{ {
*finished = true; SpinLockAcquire(&walrcv->mutex);
return receivedUpto;
}
pg_usleep(100000L); /* 100ms */ if (walrcv->walRcvState == WALRCV_STARTING)
state = walrcv->walRcvState = WALRCV_STOPPED;
/*
* This possibly-long loop needs to handle interrupts of startup
* process.
*/
HandleStartupProcInterrupts();
/* SpinLockRelease(&walrcv->mutex);
* Emergency bailout if postmaster has died. This is to avoid the }
* necessity for manual cleanup of all postmaster children.
*/
if (!PostmasterIsAlive(true))
exit(1);
} }
if (state != WALRCV_STOPPED)
return true;
else
return false;
} }
/* /*
* Stop walreceiver and wait for it to die. * Stop walreceiver (if running) and wait for it to die.
*/ */
static void void
ShutdownWalRcv(void) ShutdownWalRcv(void)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv; volatile WalRcvData *walrcv = WalRcv;
pid_t walrcvpid; pid_t walrcvpid = 0;
/* /*
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
...@@ -164,15 +130,25 @@ ShutdownWalRcv(void) ...@@ -164,15 +130,25 @@ ShutdownWalRcv(void)
* restart itself. * restart itself.
*/ */
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_RUNNING); switch(walrcv->walRcvState)
walrcv->walRcvState = WALRCV_STOPPING; {
walrcvpid = walrcv->pid; case WALRCV_STOPPED:
break;
case WALRCV_STARTING:
walrcv->walRcvState = WALRCV_STOPPED;
break;
case WALRCV_RUNNING:
walrcv->walRcvState = WALRCV_STOPPING;
/* fall through */
case WALRCV_STOPPING:
walrcvpid = walrcv->pid;
break;
}
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
/* /*
* Pid can be 0, if no walreceiver process is active right now. * Signal walreceiver process if it was still running.
* Postmaster should restart it, and when it does, it will see the
* STOPPING state.
*/ */
if (walrcvpid != 0) if (walrcvpid != 0)
kill(walrcvpid, SIGTERM); kill(walrcvpid, SIGTERM);
...@@ -193,30 +169,6 @@ ShutdownWalRcv(void) ...@@ -193,30 +169,6 @@ ShutdownWalRcv(void)
} }
} }
/*
* Check to see if the trigger file exists. If it does, request postmaster
* to shut down walreceiver and wait for it to exit, and remove the trigger
* file.
*/
static bool
CheckForStandbyTrigger(void)
{
struct stat stat_buf;
if (TriggerFile == NULL)
return false;
if (stat(TriggerFile, &stat_buf) == 0)
{
ereport(LOG,
(errmsg("trigger file found: %s", TriggerFile)));
ShutdownWalRcv();
unlink(TriggerFile);
return true;
}
return false;
}
/* /*
* Request postmaster to start walreceiver. * Request postmaster to start walreceiver.
* *
...@@ -228,17 +180,30 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) ...@@ -228,17 +180,30 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv; volatile WalRcvData *walrcv = WalRcv;
pg_time_t now = (pg_time_t) time(NULL);
Assert(walrcv->walRcvState == WALRCV_NOT_STARTED); /*
* We always start at the beginning of the segment.
* That prevents a broken segment (i.e., with no records in the
* first half of a segment) from being created by XLOG streaming,
* which might cause trouble later on if the segment is e.g
* archived.
*/
if (recptr.xrecoff % XLogSegSize != 0)
recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
/* It better be stopped before we try to restart it */
Assert(walrcv->walRcvState == WALRCV_STOPPED);
/* locking is just pro forma here; walreceiver isn't started yet */
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
walrcv->receivedUpto = recptr;
if (conninfo != NULL) if (conninfo != NULL)
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else else
walrcv->conninfo[0] = '\0'; walrcv->conninfo[0] = '\0';
walrcv->walRcvState = WALRCV_RUNNING; walrcv->walRcvState = WALRCV_STARTING;
walrcv->startTime = now;
walrcv->receivedUpto = recptr;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
...@@ -260,3 +225,4 @@ GetWalRcvWriteRecPtr(void) ...@@ -260,3 +225,4 @@ GetWalRcvWriteRecPtr(void)
return recptr; return recptr;
} }
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
* *
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
* *
* $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.4 2010/01/20 18:54:27 heikki Exp $ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.5 2010/01/27 15:27:51 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -27,10 +27,10 @@ ...@@ -27,10 +27,10 @@
*/ */
typedef enum typedef enum
{ {
WALRCV_NOT_STARTED, WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_RUNNING, /* walreceiver has been started */ WALRCV_STARTING, /* launched, but the process hasn't initialized yet */
WALRCV_STOPPING, /* requested to stop, but still running */ WALRCV_RUNNING, /* walreceiver is running */
WALRCV_STOPPED /* stopped and mustn't start up again */ WALRCV_STOPPING /* requested to stop, but still running */
} WalRcvState; } WalRcvState;
/* Shared memory area for management of walreceiver process */ /* Shared memory area for management of walreceiver process */
...@@ -47,6 +47,7 @@ typedef struct ...@@ -47,6 +47,7 @@ typedef struct
*/ */
pid_t pid; pid_t pid;
WalRcvState walRcvState; WalRcvState walRcvState;
pg_time_t startTime;
/* /*
* receivedUpto-1 is the last byte position that has been already * receivedUpto-1 is the last byte position that has been already
...@@ -74,6 +75,7 @@ extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; ...@@ -74,6 +75,7 @@ extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
extern void WalReceiverMain(void); extern void WalReceiverMain(void);
extern Size WalRcvShmemSize(void); extern Size WalRcvShmemSize(void);
extern void WalRcvShmemInit(void); extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void); extern bool WalRcvInProgress(void);
extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished); extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo); extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.28 2010/01/15 09:19:09 heikki Exp $ * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.29 2010/01/27 15:27:51 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -30,7 +30,6 @@ typedef enum ...@@ -30,7 +30,6 @@ typedef enum
PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */ PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */
PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */ PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */
PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */ PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */
PMSIGNAL_SHUTDOWN_WALRECEIVER, /* shut down a walreceiver */
NUM_PMSIGNALS /* Must be last value of enum! */ NUM_PMSIGNALS /* Must be last value of enum! */
} PMSignalReason; } PMSignalReason;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment