Commit ff8f160b authored by Heikki Linnakangas's avatar Heikki Linnakangas

Put the logic to wait for WAL in standby mode to a separate function.

This is just refactoring with no user-visible effect, to make the code more
readable.
parent f7491616
...@@ -157,6 +157,9 @@ HotStandbyState standbyState = STANDBY_DISABLED; ...@@ -157,6 +157,9 @@ HotStandbyState standbyState = STANDBY_DISABLED;
static XLogRecPtr LastRec; static XLogRecPtr LastRec;
/* Local copy of WalRcv->receivedUpto */
static XLogRecPtr receivedUpto = 0;
/* /*
* During recovery, lastFullPageWrites keeps track of full_page_writes that * During recovery, lastFullPageWrites keeps track of full_page_writes that
* the replayed WAL records indicate. It's initialized with full_page_writes * the replayed WAL records indicate. It's initialized with full_page_writes
...@@ -538,6 +541,7 @@ static int readFile = -1; ...@@ -538,6 +541,7 @@ static int readFile = -1;
static XLogSegNo readSegNo = 0; static XLogSegNo readSegNo = 0;
static uint32 readOff = 0; static uint32 readOff = 0;
static uint32 readLen = 0; static uint32 readLen = 0;
static bool readFileHeaderValidated = false;
static int readSource = 0; /* XLOG_FROM_* code */ static int readSource = 0; /* XLOG_FROM_* code */
/* /*
...@@ -628,6 +632,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, ...@@ -628,6 +632,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess); bool randAccess);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt);
static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
static void XLogFileClose(void); static void XLogFileClose(void);
static void PreallocXlogFiles(XLogRecPtr endptr); static void PreallocXlogFiles(XLogRecPtr endptr);
...@@ -2685,6 +2691,9 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, ...@@ -2685,6 +2691,9 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
if (source != XLOG_FROM_STREAM) if (source != XLOG_FROM_STREAM)
XLogReceiptTime = GetCurrentTimestamp(); XLogReceiptTime = GetCurrentTimestamp();
/* The file header needs to be validated on first access */
readFileHeaderValidated = false;
return fd; return fd;
} }
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
...@@ -9233,12 +9242,9 @@ static bool ...@@ -9233,12 +9242,9 @@ static bool
XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess) bool randAccess)
{ {
static XLogRecPtr receivedUpto = 0;
bool switched_segment = false;
uint32 targetPageOff; uint32 targetPageOff;
uint32 targetRecOff; uint32 targetRecOff;
XLogSegNo targetSegNo; XLogSegNo targetSegNo;
static pg_time_t last_fail_time = 0;
XLByteToSeg(*RecPtr, targetSegNo); XLByteToSeg(*RecPtr, targetSegNo);
targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
...@@ -9282,12 +9288,156 @@ retry: ...@@ -9282,12 +9288,156 @@ retry:
(readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto))) (readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
{ {
if (StandbyMode) if (StandbyMode)
{
if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess,
fetching_ckpt))
goto triggered;
}
else
{
/* In archive or crash recovery. */
if (readFile < 0)
{
int sources;
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
sources = XLOG_FROM_PG_XLOG;
if (InArchiveRecovery)
sources |= XLOG_FROM_ARCHIVE;
readFile = XLogFileReadAnyTLI(readSegNo, emode, sources);
if (readFile < 0)
return false;
}
}
}
/*
* At this point, we have the right segment open and if we're streaming we
* know the requested record is in it.
*/
Assert(readFile != -1);
/*
* If the current segment is being streamed from master, calculate how
* much of the current page we have received already. We know the
* requested record has been received, but this is for the benefit of
* future calls, to allow quick exit at the top of this function.
*/
if (readSource == XLOG_FROM_STREAM)
{
if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
{
readLen = XLOG_BLCKSZ;
}
else
readLen = receivedUpto % XLogSegSize - targetPageOff;
}
else
readLen = XLOG_BLCKSZ;
if (!readFileHeaderValidated && targetPageOff != 0)
{ {
/* /*
* In standby mode, wait for the requested record to become * Whenever switching to a new WAL segment, we read the first page of
* available, either via restore_command succeeding to restore the * the file and validate its header, even if that's not where the
* segment, or via walreceiver having streamed the record. * target record is. This is so that we can check the additional
* identification info that is present in the first page's "long"
* header.
*/ */
readOff = 0;
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
char fname[MAXFNAMELEN];
XLogFileName(fname, curFileTLI, readSegNo);
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u: %m",
fname, readOff)));
goto next_record_is_invalid;
}
if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
}
/* Read the requested page */
readOff = targetPageOff;
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
{
char fname[MAXFNAMELEN];
XLogFileName(fname, curFileTLI, readSegNo);
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errcode_for_file_access(),
errmsg("could not seek in log segment %s to offset %u: %m",
fname, readOff)));
goto next_record_is_invalid;
}
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
char fname[MAXFNAMELEN];
XLogFileName(fname, curFileTLI, readSegNo);
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u: %m",
fname, readOff)));
goto next_record_is_invalid;
}
if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
readFileHeaderValidated = true;
Assert(targetSegNo == readSegNo);
Assert(targetPageOff == readOff);
Assert(targetRecOff < readLen);
return true;
next_record_is_invalid:
failedSources |= readSource;
if (readFile >= 0)
close(readFile);
readFile = -1;
readLen = 0;
readSource = 0;
/* In standby-mode, keep trying */
if (StandbyMode)
goto retry;
else
return false;
triggered:
if (readFile >= 0)
close(readFile);
readFile = -1;
readLen = 0;
readSource = 0;
return false;
}
/*
* In standby mode, wait for the requested record to become available, either
* via restore_command succeeding to restore the segment, or via walreceiver
* having streamed the record (or via someone copying the segment directly to
* pg_xlog, but that is not documented or recommended).
*
* When the requested record becomes available, the function opens the file
* containing it (if not open already), and returns true. When end of standby
* mode is triggered by the user, and there is no more WAL available, returns
* false.
*/
static bool
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt)
{
static pg_time_t last_fail_time = 0;
for (;;) for (;;)
{ {
if (WalRcvInProgress()) if (WalRcvInProgress())
...@@ -9295,14 +9445,13 @@ retry: ...@@ -9295,14 +9445,13 @@ retry:
bool havedata; bool havedata;
/* /*
* If we find an invalid record in the WAL streamed from * If we find an invalid record in the WAL streamed from master,
* master, something is seriously wrong. There's little * something is seriously wrong. There's little chance that the
* chance that the problem will just go away, but PANIC is * problem will just go away, but PANIC is not good for
* not good for availability either, especially in hot * availability either, especially in hot standby mode.
* standby mode. Disconnect, and retry from * Disconnect, and retry from archive/pg_xlog again. The WAL in
* archive/pg_xlog again. The WAL in the archive should be * the archive should be identical to what was streamed, so it's
* identical to what was streamed, so it's unlikely that * unlikely that it helps, but one can hope...
* it helps, but one can hope...
*/ */
if (failedSources & XLOG_FROM_STREAM) if (failedSources & XLOG_FROM_STREAM)
{ {
...@@ -9313,26 +9462,25 @@ retry: ...@@ -9313,26 +9462,25 @@ retry:
/* /*
* Walreceiver is active, so see if new data has arrived. * Walreceiver is active, so see if new data has arrived.
* *
* We only advance XLogReceiptTime when we obtain fresh * We only advance XLogReceiptTime when we obtain fresh WAL from
* WAL from walreceiver and observe that we had already * walreceiver and observe that we had already processed
* processed everything before the most recent "chunk" * everything before the most recent "chunk" that it flushed to
* that it flushed to disk. In steady state where we are * disk. In steady state where we are keeping up with the
* keeping up with the incoming data, XLogReceiptTime will * incoming data, XLogReceiptTime will be updated on each cycle.
* be updated on each cycle. When we are behind, * When we are behind, XLogReceiptTime will not advance, so the
* XLogReceiptTime will not advance, so the grace time * grace time allotted to conflicting queries will decrease.
* alloted to conflicting queries will decrease.
*/ */
if (XLByteLT(*RecPtr, receivedUpto)) if (XLByteLT(RecPtr, receivedUpto))
havedata = true; havedata = true;
else else
{ {
XLogRecPtr latestChunkStart; XLogRecPtr latestChunkStart;
receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart); receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
if (XLByteLT(*RecPtr, receivedUpto)) if (XLByteLT(RecPtr, receivedUpto))
{ {
havedata = true; havedata = true;
if (!XLByteLT(*RecPtr, latestChunkStart)) if (!XLByteLT(RecPtr, latestChunkStart))
{ {
XLogReceiptTime = GetCurrentTimestamp(); XLogReceiptTime = GetCurrentTimestamp();
SetCurrentChunkStartTime(XLogReceiptTime); SetCurrentChunkStartTime(XLogReceiptTime);
...@@ -9344,19 +9492,16 @@ retry: ...@@ -9344,19 +9492,16 @@ retry:
if (havedata) if (havedata)
{ {
/* /*
* Great, streamed far enough. Open the file if it's * Great, streamed far enough. Open the file if it's not open
* not open already. Use XLOG_FROM_STREAM so that * already. Use XLOG_FROM_STREAM so that source info is set
* source info is set correctly and XLogReceiptTime * correctly and XLogReceiptTime isn't changed.
* isn't changed.
*/ */
if (readFile < 0) if (readFile < 0)
{ {
readFile = readFile = XLogFileRead(readSegNo, PANIC,
XLogFileRead(readSegNo, PANIC, curFileTLI,
recoveryTargetTLI,
XLOG_FROM_STREAM, false); XLOG_FROM_STREAM, false);
Assert(readFile >= 0); Assert(readFile >= 0);
switched_segment = true;
} }
else else
{ {
...@@ -9368,11 +9513,11 @@ retry: ...@@ -9368,11 +9513,11 @@ retry:
} }
/* /*
* Data not here yet, so check for trigger then sleep for * Data not here yet, so check for trigger then sleep for five
* five seconds like in the WAL file polling case below. * seconds like in the WAL file polling case below.
*/ */
if (CheckForStandbyTrigger()) if (CheckForStandbyTrigger())
goto retry; return false;
/* /*
* Wait for more WAL to arrive, or timeout to be reached * Wait for more WAL to arrive, or timeout to be reached
...@@ -9384,13 +9529,12 @@ retry: ...@@ -9384,13 +9529,12 @@ retry:
} }
else else
{ {
/*
* WAL receiver is not active. Poll the archive.
*/
int sources; int sources;
pg_time_t now; pg_time_t now;
/*
* Until walreceiver manages to reconnect, poll the
* archive.
*/
if (readFile >= 0) if (readFile >= 0)
{ {
close(readFile); close(readFile);
...@@ -9401,22 +9545,20 @@ retry: ...@@ -9401,22 +9545,20 @@ retry:
curFileTLI = 0; curFileTLI = 0;
/* /*
* Try to restore the file from archive, or read an * Try to restore the file from archive, or read an existing file
* existing file from pg_xlog. * from pg_xlog.
*/ */
sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG; sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
if (!(sources & ~failedSources)) if (!(sources & ~failedSources))
{ {
/* /*
* We've exhausted all options for retrieving the * We've exhausted all options for retrieving the file. Retry.
* file. Retry.
*/ */
failedSources = 0; failedSources = 0;
/* /*
* Before we sleep, re-scan for possible new timelines * Before we sleep, re-scan for possible new timelines if we
* if we were requested to recover to the latest * were requested to recover to the latest timeline.
* timeline.
*/ */
if (recoveryTargetIsLatest) if (recoveryTargetIsLatest)
{ {
...@@ -9425,8 +9567,8 @@ retry: ...@@ -9425,8 +9567,8 @@ retry:
} }
/* /*
* If it hasn't been long since last attempt, sleep to * If it hasn't been long since last attempt, sleep to avoid
* avoid busy-waiting. * busy-waiting.
*/ */
now = (pg_time_t) time(NULL); now = (pg_time_t) time(NULL);
if ((now - last_fail_time) < 5) if ((now - last_fail_time) < 5)
...@@ -9437,30 +9579,27 @@ retry: ...@@ -9437,30 +9579,27 @@ retry:
last_fail_time = now; last_fail_time = now;
/* /*
* If primary_conninfo is set, launch walreceiver to * If primary_conninfo is set, launch walreceiver to try to
* try to stream the missing WAL, before retrying to * stream the missing WAL, before retrying to restore from
* restore from archive/pg_xlog. * archive/pg_xlog.
* *
* If fetching_ckpt is TRUE, RecPtr points to the * If fetching_ckpt is TRUE, RecPtr points to the initial
* initial checkpoint location. In that case, we use * checkpoint location. In that case, we use RedoStartLSN as
* RedoStartLSN as the streaming start position * the streaming start position instead of RecPtr, so that
* instead of RecPtr, so that when we later jump * when we later jump backwards to start redo at RedoStartLSN,
* backwards to start redo at RedoStartLSN, we will * we will have the logs streamed already.
* have the logs streamed already.
*/ */
if (PrimaryConnInfo) if (PrimaryConnInfo)
{ {
RequestXLogStreaming( XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
fetching_ckpt ? RedoStartLSN : *RecPtr,
PrimaryConnInfo); RequestXLogStreaming(ptr, PrimaryConnInfo);
continue; continue;
} }
} }
/* Don't try to read from a source that just failed */ /* Don't try to read from a source that just failed */
sources &= ~failedSources; sources &= ~failedSources;
readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources);
sources);
switched_segment = true;
if (readFile >= 0) if (readFile >= 0)
break; break;
...@@ -9470,147 +9609,23 @@ retry: ...@@ -9470,147 +9609,23 @@ retry:
failedSources |= sources; failedSources |= sources;
/* /*
* Check to see if the trigger file exists. Note that we * Check to see if the trigger file exists. Note that we do this
* do this only after failure, so when you create the * only after failure, so when you create the trigger file, we
* trigger file, we still finish replaying as much as we * still finish replaying as much as we can from archive and
* can from archive and pg_xlog before failover. * pg_xlog before failover.
*/ */
if (CheckForStandbyTrigger()) if (CheckForStandbyTrigger())
goto triggered;
}
/*
* This possibly-long loop needs to handle interrupts of
* startup process.
*/
HandleStartupProcInterrupts();
}
}
else
{
/* In archive or crash recovery. */
if (readFile < 0)
{
int sources;
/* Reset curFileTLI if random fetch. */
if (randAccess)
curFileTLI = 0;
sources = XLOG_FROM_PG_XLOG;
if (InArchiveRecovery)
sources |= XLOG_FROM_ARCHIVE;
readFile = XLogFileReadAnyTLI(readSegNo, emode, sources);
switched_segment = true;
if (readFile < 0)
return false; return false;
} }
}
}
/*
* At this point, we have the right segment open and if we're streaming we
* know the requested record is in it.
*/
Assert(readFile != -1);
/* /*
* If the current segment is being streamed from master, calculate how * This possibly-long loop needs to handle interrupts of startup
* much of the current page we have received already. We know the * process.
* requested record has been received, but this is for the benefit of
* future calls, to allow quick exit at the top of this function.
*/
if (readSource == XLOG_FROM_STREAM)
{
if (((*RecPtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ))
{
readLen = XLOG_BLCKSZ;
}
else
readLen = receivedUpto % XLogSegSize - targetPageOff;
}
else
readLen = XLOG_BLCKSZ;
if (switched_segment && targetPageOff != 0)
{
/*
* Whenever switching to a new WAL segment, we read the first page of
* the file and validate its header, even if that's not where the
* target record is. This is so that we can check the additional
* identification info that is present in the first page's "long"
* header.
*/ */
readOff = 0; HandleStartupProcInterrupts();
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
char fname[MAXFNAMELEN];
XLogFileName(fname, curFileTLI, readSegNo);
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u: %m",
fname, readOff)));
goto next_record_is_invalid;
}
if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
}
/* Read the requested page */
readOff = targetPageOff;
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
{
char fname[MAXFNAMELEN];
XLogFileName(fname, curFileTLI, readSegNo);
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errcode_for_file_access(),
errmsg("could not seek in log segment %s to offset %u: %m",
fname, readOff)));
goto next_record_is_invalid;
}
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
char fname[MAXFNAMELEN];
XLogFileName(fname, curFileTLI, readSegNo);
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u: %m",
fname, readOff)));
goto next_record_is_invalid;
} }
if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid;
Assert(targetSegNo == readSegNo);
Assert(targetPageOff == readOff);
Assert(targetRecOff < readLen);
return true; return true;
next_record_is_invalid:
failedSources |= readSource;
if (readFile >= 0)
close(readFile);
readFile = -1;
readLen = 0;
readSource = 0;
/* In standby-mode, keep trying */
if (StandbyMode)
goto retry;
else
return false;
triggered:
if (readFile >= 0)
close(readFile);
readFile = -1;
readLen = 0;
readSource = 0;
return false;
} }
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment