Commit a068c391 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Refactor the code implementing standby-mode logic.

It is now easier to see that it's a state machine, making the code easier
to understand overall.
parent e2b3c21b
...@@ -506,12 +506,18 @@ static XLogwrtResult LogwrtResult = {0, 0}; ...@@ -506,12 +506,18 @@ static XLogwrtResult LogwrtResult = {0, 0};
/* /*
* Codes indicating where we got a WAL file from during recovery, or where * Codes indicating where we got a WAL file from during recovery, or where
* to attempt to get one. These are chosen so that they can be OR'd together * to attempt to get one.
* in a bitmask state variable.
*/ */
#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */ typedef enum
#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */ {
#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */ XLOG_FROM_ANY = 0, /* request to read WAL from any source */
XLOG_FROM_ARCHIVE, /* restored using restore_command */
XLOG_FROM_PG_XLOG, /* existing file in pg_xlog */
XLOG_FROM_STREAM, /* streamed from master */
} XLogSource;
/* human-readable names for XLogSources, for debugging output */
static const char *xlogSourceNames[] = { "any", "archive", "pg_xlog", "stream" };
/* /*
* openLogFile is -1 or a kernel FD for an open log file segment. * openLogFile is -1 or a kernel FD for an open log file segment.
...@@ -536,22 +542,28 @@ static XLogSegNo readSegNo = 0; ...@@ -536,22 +542,28 @@ static XLogSegNo readSegNo = 0;
static uint32 readOff = 0; static uint32 readOff = 0;
static uint32 readLen = 0; static uint32 readLen = 0;
static bool readFileHeaderValidated = false; static bool readFileHeaderValidated = false;
static int readSource = 0; /* XLOG_FROM_* code */ static XLogSource readSource = 0; /* XLOG_FROM_* code */
/* /*
* Keeps track of which sources we've tried to read the current WAL * Keeps track of which source we're currently reading from. This is
* record from and failed. * different from readSource in that this is always set, even when we don't
* currently have a WAL file open. If lastSourceFailed is set, our last
* attempt to read from currentSource failed, and we should try another source
* next.
*/ */
static int failedSources = 0; /* OR of XLOG_FROM_* codes */ static XLogSource currentSource = 0; /* XLOG_FROM_* code */
static bool lastSourceFailed = false;
/* /*
* These variables track when we last obtained some WAL data to process, * These variables track when we last obtained some WAL data to process,
* and where we got it from. (XLogReceiptSource is initially the same as * and where we got it from. (XLogReceiptSource is initially the same as
* readSource, but readSource gets reset to zero when we don't have data * readSource, but readSource gets reset to zero when we don't have data
* to process right now.) * to process right now. It is also different from currentSource, which
* also changes when we try to read from a source and fail, while
* XLogReceiptSource tracks where we last successfully read some WAL.)
*/ */
static TimestampTz XLogReceiptTime = 0; static TimestampTz XLogReceiptTime = 0;
static int XLogReceiptSource = 0; /* XLOG_FROM_* code */ static XLogSource XLogReceiptSource = 0; /* XLOG_FROM_* code */
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */ /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL; static char *readBuf = NULL;
...@@ -605,7 +617,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, ...@@ -605,7 +617,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
bool use_lock); bool use_lock);
static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
int source, bool notexistOk); int source, bool notexistOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source);
static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess); bool randAccess);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
...@@ -2551,7 +2563,7 @@ XLogFileOpen(XLogSegNo segno) ...@@ -2551,7 +2563,7 @@ XLogFileOpen(XLogSegNo segno)
/* /*
* Open a logfile segment for reading (during recovery). * Open a logfile segment for reading (during recovery).
* *
* If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive. * If source == XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
* Otherwise, it's assumed to be already available in pg_xlog. * Otherwise, it's assumed to be already available in pg_xlog.
*/ */
static int static int
...@@ -2697,7 +2709,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, ...@@ -2697,7 +2709,7 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
* This version searches for the segment with any TLI listed in expectedTLIs. * This version searches for the segment with any TLI listed in expectedTLIs.
*/ */
static int static int
XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources) XLogFileReadAnyTLI(XLogSegNo segno, int emode, int source)
{ {
char path[MAXPGPATH]; char path[MAXPGPATH];
ListCell *cell; ListCell *cell;
...@@ -2720,7 +2732,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources) ...@@ -2720,7 +2732,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
if (tli < curFileTLI) if (tli < curFileTLI)
break; /* don't bother looking at too-old TLIs */ break; /* don't bother looking at too-old TLIs */
if (sources & XLOG_FROM_ARCHIVE) if (source == XLOG_FROM_ANY || source == XLOG_FROM_ARCHIVE)
{ {
fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true); fd = XLogFileRead(segno, emode, tli, XLOG_FROM_ARCHIVE, true);
if (fd != -1) if (fd != -1)
...@@ -2730,7 +2742,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources) ...@@ -2730,7 +2742,7 @@ XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources)
} }
} }
if (sources & XLOG_FROM_PG_XLOG) if (source == XLOG_FROM_ANY || source == XLOG_FROM_PG_XLOG)
{ {
fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true); fd = XLogFileRead(segno, emode, tli, XLOG_FROM_PG_XLOG, true);
if (fd != -1) if (fd != -1)
...@@ -3332,7 +3344,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) ...@@ -3332,7 +3344,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
} }
/* This is the first try to read this page. */ /* This is the first try to read this page. */
failedSources = 0; lastSourceFailed = false;
retry: retry:
/* Read the page containing the record */ /* Read the page containing the record */
if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess)) if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
...@@ -3545,7 +3557,7 @@ retry: ...@@ -3545,7 +3557,7 @@ retry:
return record; return record;
next_record_is_invalid: next_record_is_invalid:
failedSources |= readSource; lastSourceFailed = true;
if (readFile >= 0) if (readFile >= 0)
{ {
...@@ -9162,7 +9174,7 @@ CancelBackup(void) ...@@ -9162,7 +9174,7 @@ CancelBackup(void)
* In standby mode, if after a successful return of XLogPageRead() the * In standby mode, if after a successful return of XLogPageRead() the
* caller finds the record it's interested in to be broken, it should * caller finds the record it's interested in to be broken, it should
* ereport the error with the level determined by * ereport the error with the level determined by
* emode_for_corrupt_record(), and then set "failedSources |= readSource" * emode_for_corrupt_record(), and then set lastSourceFailed
* and call XLogPageRead() again with the same arguments. This lets * and call XLogPageRead() again with the same arguments. This lets
* XLogPageRead() to try fetching the record from another source, or to * XLogPageRead() to try fetching the record from another source, or to
* sleep and retry. * sleep and retry.
...@@ -9180,7 +9192,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -9180,7 +9192,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
targetRecOff = (*RecPtr) % XLOG_BLCKSZ; targetRecOff = (*RecPtr) % XLOG_BLCKSZ;
/* Fast exit if we have read the record in the current buffer already */ /* Fast exit if we have read the record in the current buffer already */
if (failedSources == 0 && targetSegNo == readSegNo && if (!lastSourceFailed && targetSegNo == readSegNo &&
targetPageOff == readOff && targetRecOff < readLen) targetPageOff == readOff && targetRecOff < readLen)
return true; return true;
...@@ -9227,17 +9239,18 @@ retry: ...@@ -9227,17 +9239,18 @@ retry:
/* In archive or crash recovery. */ /* In archive or crash recovery. */
if (readFile < 0) if (readFile < 0)
{ {
int sources; int source;
/* Reset curFileTLI if random fetch. */ /* Reset curFileTLI if random fetch. */
if (randAccess) if (randAccess)
curFileTLI = 0; curFileTLI = 0;
sources = XLOG_FROM_PG_XLOG;
if (InArchiveRecovery) if (InArchiveRecovery)
sources |= XLOG_FROM_ARCHIVE; source = XLOG_FROM_ANY;
else
source = XLOG_FROM_PG_XLOG;
readFile = XLogFileReadAnyTLI(readSegNo, emode, sources); readFile = XLogFileReadAnyTLI(readSegNo, emode, source);
if (readFile < 0) if (readFile < 0)
return false; return false;
} }
...@@ -9326,7 +9339,7 @@ retry: ...@@ -9326,7 +9339,7 @@ retry:
return true; return true;
next_record_is_invalid: next_record_is_invalid:
failedSources |= readSource; lastSourceFailed = true;
if (readFile >= 0) if (readFile >= 0)
close(readFile); close(readFile);
...@@ -9366,33 +9379,208 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -9366,33 +9379,208 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt) bool fetching_ckpt)
{ {
static pg_time_t last_fail_time = 0; static pg_time_t last_fail_time = 0;
pg_time_t now;
/*-------
* Standby mode is implemented by a state machine:
*
* 1. Read from archive (XLOG_FROM_ARCHIVE)
* 2. Read from pg_xlog (XLOG_FROM_PG_XLOG)
* 3. Check trigger file
* 4. Read from primary server via walreceiver (XLOG_FROM_STREAM)
* 5. Rescan timelines
* 6. Sleep 5 seconds, and loop back to 1.
*
* Failure to read from the current source advances the state machine to
* the next state. In addition, successfully reading a file from pg_xlog
* moves the state machine from state 2 back to state 1 (we always prefer
* files in the archive over files in pg_xlog).
*
* 'currentSource' indicates the current state. There are no currentSource
* values for "check trigger", "rescan timelines", and "sleep" states,
* those actions are taken when reading from the previous source fails, as
* part of advancing to the next state.
*-------
*/
if (currentSource == 0)
currentSource = XLOG_FROM_ARCHIVE;
for (;;) for (;;)
{ {
int oldSource = currentSource;
/*
* First check if we failed to read from the current source, and
* advance the state machine if so. The failure to read might've
* happened outside this function, e.g when a CRC check fails on a
* record, or within this loop.
*/
if (lastSourceFailed)
{
switch (currentSource)
{
case XLOG_FROM_ARCHIVE:
currentSource = XLOG_FROM_PG_XLOG;
break;
case XLOG_FROM_PG_XLOG:
/*
* Check to see if the trigger file exists. Note that we do
* this only after failure, so when you create the trigger
* file, we still finish replaying as much as we can from
* archive and pg_xlog before failover.
*/
if (CheckForStandbyTrigger())
return false;
/*
* If primary_conninfo is set, launch walreceiver to try to
* stream the missing WAL.
*
* If fetching_ckpt is TRUE, RecPtr points to the initial
* checkpoint location. In that case, we use RedoStartLSN
* as the streaming start position instead of RecPtr, so
* that when we later jump backwards to start redo at
* RedoStartLSN, we will have the logs streamed already.
*/
if (PrimaryConnInfo)
{
XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
RequestXLogStreaming(ptr, PrimaryConnInfo);
}
/*
* Move to XLOG_FROM_STREAM state in either case. We'll get
* immediate failure if we didn't launch walreceiver, and
* move on to the next state.
*/
currentSource = XLOG_FROM_STREAM;
break;
case XLOG_FROM_STREAM:
/*
* Failure while streaming. Most likely, we got here because
* streaming replication was terminated, or promotion was
* triggered. But we also get here if we find an invalid
* record in the WAL streamed from master, in which case
* something is seriously wrong. There's little chance that
* the problem will just go away, but PANIC is not good for
* availability either, especially in hot standby mode. So,
* we treat that the same as disconnection, and retry from
* archive/pg_xlog again. The WAL in the archive should be
* identical to what was streamed, so it's unlikely that it
* helps, but one can hope...
*/
/*
* Before we leave XLOG_FROM_STREAM state, make sure that
* walreceiver is not running, so that it won't overwrite
* any WAL that we restore from archive.
*/
if (WalRcvInProgress()) if (WalRcvInProgress())
ShutdownWalRcv();
/*
* Before we sleep, re-scan for possible new timelines if
* we were requested to recover to the latest timeline.
*/
if (recoveryTargetIsLatest)
{
if (rescanLatestTimeLine())
{
currentSource = XLOG_FROM_ARCHIVE;
break;
}
}
/*
* XLOG_FROM_STREAM is the last state in our state machine,
* so we've exhausted all the options for obtaining the
* requested WAL. We're going to loop back and retry from
* the archive, but if it hasn't been long since last
* attempt, sleep 5 seconds to avoid busy-waiting.
*/
now = (pg_time_t) time(NULL);
if ((now - last_fail_time) < 5)
{
pg_usleep(1000000L * (5 - (now - last_fail_time)));
now = (pg_time_t) time(NULL);
}
last_fail_time = now;
currentSource = XLOG_FROM_ARCHIVE;
break;
default:
elog(ERROR, "unexpected WAL source %d", currentSource);
}
}
else if (currentSource == XLOG_FROM_PG_XLOG)
{
/*
* We just successfully read a file in pg_xlog. We prefer files
* in the archive over ones in pg_xlog, so try the next file
* again from the archive first.
*/
currentSource = XLOG_FROM_ARCHIVE;
}
if (currentSource != oldSource)
elog(LOG, "switched WAL source from %s to %s after %s",
xlogSourceNames[oldSource], xlogSourceNames[currentSource],
lastSourceFailed ? "failure" : "success");
/*
* We've now handled possible failure. Try to read from the chosen
* source.
*/
lastSourceFailed = false;
switch (currentSource)
{
case XLOG_FROM_ARCHIVE:
case XLOG_FROM_PG_XLOG:
/* Close any old file we might have open. */
if (readFile >= 0)
{
close(readFile);
readFile = -1;
}
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
/*
* Try to restore the file from archive, or read an existing
* file from pg_xlog.
*/
readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, currentSource);
if (readFile >= 0)
return true; /* success! */
/*
* Nope, not found in archive or pg_xlog.
*/
lastSourceFailed = true;
break;
case XLOG_FROM_STREAM:
{ {
bool havedata; bool havedata;
/* /*
* If we find an invalid record in the WAL streamed from master, * Check if WAL receiver is still active.
* something is seriously wrong. There's little chance that the
* problem will just go away, but PANIC is not good for
* availability either, especially in hot standby mode.
* Disconnect, and retry from archive/pg_xlog again. The WAL in
* the archive should be identical to what was streamed, so it's
* unlikely that it helps, but one can hope...
*/ */
if (failedSources & XLOG_FROM_STREAM) if (!WalRcvInProgress())
{ {
ShutdownWalRcv(); lastSourceFailed = true;
continue; break;
} }
/* /*
* Walreceiver is active, so see if new data has arrived. * Walreceiver is active, so see if new data has arrived.
* *
* We only advance XLogReceiptTime when we obtain fresh WAL from * We only advance XLogReceiptTime when we obtain fresh WAL
* walreceiver and observe that we had already processed * from walreceiver and observe that we had already processed
* everything before the most recent "chunk" that it flushed to * everything before the most recent "chunk" that it flushed to
* disk. In steady state where we are keeping up with the * disk. In steady state where we are keeping up with the
* incoming data, XLogReceiptTime will be updated on each cycle. * incoming data, XLogReceiptTime will be updated on each cycle.
...@@ -9421,9 +9609,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -9421,9 +9609,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if (havedata) if (havedata)
{ {
/* /*
* Great, streamed far enough. Open the file if it's not open * Great, streamed far enough. Open the file if it's not
* already. Use XLOG_FROM_STREAM so that source info is set * open already. Use XLOG_FROM_STREAM so that source info
* correctly and XLogReceiptTime isn't changed. * is set correctly and XLogReceiptTime isn't changed.
*/ */
if (readFile < 0) if (readFile < 0)
{ {
...@@ -9437,114 +9625,43 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -9437,114 +9625,43 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
/* just make sure source info is correct... */ /* just make sure source info is correct... */
readSource = XLOG_FROM_STREAM; readSource = XLOG_FROM_STREAM;
XLogReceiptSource = XLOG_FROM_STREAM; XLogReceiptSource = XLOG_FROM_STREAM;
return true;
} }
break; break;
} }
/* /*
* Data not here yet, so check for trigger then sleep for five * Data not here yet. Check for trigger, then wait for
* seconds like in the WAL file polling case below. * walreceiver to wake us up when new WAL arrives.
*/ */
if (CheckForStandbyTrigger()) if (CheckForStandbyTrigger())
return false;
/*
* Wait for more WAL to arrive, or timeout to be reached
*/
WaitLatch(&XLogCtl->recoveryWakeupLatch,
WL_LATCH_SET | WL_TIMEOUT,
5000L);
ResetLatch(&XLogCtl->recoveryWakeupLatch);
}
else
{
/*
* WAL receiver is not active. Poll the archive.
*/
int sources;
pg_time_t now;
if (readFile >= 0)
{ {
close(readFile);
readFile = -1;
}
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
/* /*
* Try to restore the file from archive, or read an existing file * Note that we don't "return false" immediately here.
* from pg_xlog. * After being triggered, we still want to replay all the
* WAL that was already streamed. It's in pg_xlog now, so
* we just treat this as a failure, and the state machine
* will move on to replay the streamed WAL from pg_xlog,
* and then recheck the trigger and exit replay.
*/ */
sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG; lastSourceFailed = true;
if (!(sources & ~failedSources)) break;
{
/*
* We've exhausted all options for retrieving the file. Retry.
*/
failedSources = 0;
/*
* Before we sleep, re-scan for possible new timelines if we
* were requested to recover to the latest timeline.
*/
if (recoveryTargetIsLatest)
{
if (rescanLatestTimeLine())
continue;
}
/*
* If it hasn't been long since last attempt, sleep to avoid
* busy-waiting.
*/
now = (pg_time_t) time(NULL);
if ((now - last_fail_time) < 5)
{
pg_usleep(1000000L * (5 - (now - last_fail_time)));
now = (pg_time_t) time(NULL);
} }
last_fail_time = now;
/* /*
* If primary_conninfo is set, launch walreceiver to try to * Wait for more WAL to arrive. Time out after 5 seconds, like
* stream the missing WAL, before retrying to restore from * when polling the archive, to react to a trigger file
* archive/pg_xlog. * promptly.
*
* If fetching_ckpt is TRUE, RecPtr points to the initial
* checkpoint location. In that case, we use RedoStartLSN as
* the streaming start position instead of RecPtr, so that
* when we later jump backwards to start redo at RedoStartLSN,
* we will have the logs streamed already.
*/ */
if (PrimaryConnInfo) WaitLatch(&XLogCtl->recoveryWakeupLatch,
{ WL_LATCH_SET | WL_TIMEOUT,
XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr; 5000L);
ResetLatch(&XLogCtl->recoveryWakeupLatch);
RequestXLogStreaming(ptr, PrimaryConnInfo);
continue;
}
}
/* Don't try to read from a source that just failed */
sources &= ~failedSources;
readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources);
if (readFile >= 0)
break; break;
}
/* default:
* Nope, not found in archive and/or pg_xlog. elog(ERROR, "unexpected WAL source %d", currentSource);
*/
failedSources |= sources;
/*
* Check to see if the trigger file exists. Note that we do this
* only after failure, so when you create the trigger file, we
* still finish replaying as much as we can from archive and
* pg_xlog before failover.
*/
if (CheckForStandbyTrigger())
return false;
} }
/* /*
...@@ -9554,7 +9671,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -9554,7 +9671,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
HandleStartupProcInterrupts(); HandleStartupProcInterrupts();
} }
return true; return false; /* not reached */
} }
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment