Commit c945af80 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Refactor checking whether we've reached the recovery target.

Makes the replay loop slightly more readable, by separating the concerns of
whether to stop and whether to delay, and how to extract the timestamp from
a record.

This has the user-visible change that the timestamp of the last applied
record is now updated after actually applying it. Before, it was updated
just before applying it. That meant that pg_last_xact_replay_timestamp()
could return the timestamp of a commit record that is in process of being
replayed, but not yet applied. Normally the difference is small, but if
min_recovery_apply_delay is set, there could be a significant delay between
reading a record and applying it.

Another behavioral change is that if you recover to a restore point, we stop
after the restore point record, not before it. It makes no difference as far
as running queries on the server is concerned, as applying a restore point
record changes nothing, but if examine the timeline history you will see
that the new timeline branched off just after the restore point record, not
before it. One practical consequence is that if you do PITR to the new
timeline, and set recovery target to the same named restore point again, it
will find and stop recovery at the same restore point. Conceptually, I think
it makes more sense to consider the restore point as part of the new
timeline's history than not.

In principle, setting the last-replayed timestamp before actually applying
the record was a bug all along, but it doesn't seem worth the risk to
backpatch, since min_recovery_apply_delay was only added in 9.4.
parent 10a3b165
......@@ -233,7 +233,10 @@ bool StandbyMode = false;
/* whether request for fast promotion has been made yet */
static bool fast_promote = false;
/* if recoveryStopsHere returns true, it saves actual stop xid/time/name here */
/*
* if recoveryStopsBefore/After returns true, it saves information of the stop
* point here
*/
static TransactionId recoveryStopXid;
static TimestampTz recoveryStopTime;
static char recoveryStopName[MAXFNAMELEN];
......@@ -731,10 +734,10 @@ static bool holdingAllSlots = false;
static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis, bool *delayThis);
static bool recoveryStopsBefore(XLogRecord *record);
static bool recoveryStopsAfter(XLogRecord *record);
static void recoveryPausesHere(void);
static void recoveryApplyDelay(void);
static bool SetRecoveryDelayUntilTime(TimestampTz xtime);
static bool recoveryApplyDelay(XLogRecord *record);
static void SetLatestXTime(TimestampTz xtime);
static void SetCurrentChunkStartTime(TimestampTz xtime);
static void CheckRequiredParameterValues(void);
......@@ -5622,88 +5625,69 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
}
/*
* For point-in-time recovery, this function decides whether we want to
* stop applying the XLOG at or after the current record.
* Extract timestamp from WAL record.
*
* Returns TRUE if we are stopping, FALSE otherwise. On TRUE return,
* *includeThis is set TRUE if we should apply this record before stopping.
*
* We also track the timestamp of the latest applied COMMIT/ABORT
* record in XLogCtl->recoveryLastXTime, for logging purposes.
* Also, some information is saved in recoveryStopXid et al for use in
* annotating the new timeline's history file; and recoveryDelayUntilTime
* is updated, for time-delayed standbys.
* If the record contains a timestamp, returns true, and saves the timestamp
* in *recordXtime. If the record type has no timestamp, returns false.
* Currently, only transaction commit/abort records and restore points contain
* timestamps.
*/
static bool
recoveryStopsHere(XLogRecord *record, bool *includeThis, bool *delayThis)
getRecordTimestamp(XLogRecord *record, TimestampTz *recordXtime)
{
bool stopsHere;
uint8 record_info;
TimestampTz recordXtime;
char recordRPName[MAXFNAMELEN];
uint8 record_info = record->xl_info & ~XLR_INFO_MASK;
/* We only consider stopping at COMMIT, ABORT or RESTORE POINT records */
if (record->xl_rmid != RM_XACT_ID && record->xl_rmid != RM_XLOG_ID)
return false;
record_info = record->xl_info & ~XLR_INFO_MASK;
if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
if (record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
{
xl_xact_commit_compact *recordXactCommitData;
recordXactCommitData = (xl_xact_commit_compact *) XLogRecGetData(record);
recordXtime = recordXactCommitData->xact_time;
*delayThis = SetRecoveryDelayUntilTime(recordXactCommitData->xact_time);
*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
return true;
}
else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
{
xl_xact_commit *recordXactCommitData;
recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
recordXtime = recordXactCommitData->xact_time;
*delayThis = SetRecoveryDelayUntilTime(recordXactCommitData->xact_time);
*recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
return true;
}
else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
{
xl_xact_abort *recordXactAbortData;
recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
recordXtime = recordXactAbortData->xact_time;
/*
* We deliberately choose not to delay aborts since they have no
* effect on MVCC. We already allow replay of records that don't
* have a timestamp, so there is already opportunity for issues
* caused by early conflicts on standbys.
*/
*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
return true;
}
else if (record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
{
xl_restore_point *recordRestorePointData;
recordRestorePointData = (xl_restore_point *) XLogRecGetData(record);
recordXtime = recordRestorePointData->rp_time;
strncpy(recordRPName, recordRestorePointData->rp_name, MAXFNAMELEN);
*delayThis = SetRecoveryDelayUntilTime(recordRestorePointData->rp_time);
*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
return true;
}
else
return false;
}
/* Do we have a PITR target at all? */
if (recoveryTarget == RECOVERY_TARGET_UNSET)
{
/*
* Save timestamp of latest transaction commit/abort if this is a
* transaction record
/*
* For point-in-time recovery, this function decides whether we want to
* stop applying the XLOG before the current record.
*
* Returns TRUE if we are stopping, FALSE otherwise. If stopping, some
* information is saved in recoveryStopXid et al for use in annotating the
* new timeline's history file.
*/
if (record->xl_rmid == RM_XACT_ID)
SetLatestXTime(recordXtime);
static bool
recoveryStopsBefore(XLogRecord *record)
{
bool stopsHere = false;
uint8 record_info;
bool isCommit;
TimestampTz recordXtime = 0;
/* We only consider stopping before COMMIT or ABORT records. */
if (record->xl_rmid != RM_XACT_ID)
return false;
record_info = record->xl_info & ~XLR_INFO_MASK;
if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
isCommit = true;
else if (record_info == XLOG_XACT_ABORT)
isCommit = false;
else
return false;
}
if (recoveryTarget == RECOVERY_TARGET_XID)
if (recoveryTarget == RECOVERY_TARGET_XID && !recoveryTargetInclusive)
{
/*
* There can be only one transaction end record with this exact
......@@ -5711,28 +5695,14 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis, bool *delayThis)
*
* when testing for an xid, we MUST test for equality only, since
* transactions are numbered in the order they start, not the order
* they complete. A higher numbered xid will complete before you about
* 50% of the time...
* they complete. A higher numbered xid will complete before you
* about 50% of the time...
*/
stopsHere = (record->xl_xid == recoveryTargetXid);
if (stopsHere)
*includeThis = recoveryTargetInclusive;
}
else if (recoveryTarget == RECOVERY_TARGET_NAME)
{
/*
* There can be many restore points that share the same name, so we
* stop at the first one
*/
stopsHere = (strcmp(recordRPName, recoveryTargetName) == 0);
/*
* Ignore recoveryTargetInclusive because this is not a transaction
* record
*/
*includeThis = false;
}
else
if (recoveryTarget == RECOVERY_TARGET_TIME &&
getRecordTimestamp(record, &recordXtime))
{
/*
* There can be many transactions that share the same commit time, so
......@@ -5743,64 +5713,119 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis, bool *delayThis)
stopsHere = (recordXtime > recoveryTargetTime);
else
stopsHere = (recordXtime >= recoveryTargetTime);
if (stopsHere)
*includeThis = false;
}
if (stopsHere)
{
recoveryStopAfter = false;
recoveryStopXid = record->xl_xid;
recoveryStopTime = recordXtime;
recoveryStopAfter = *includeThis;
recoveryStopName[0] = '\0';
if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
if (isCommit)
{
if (recoveryStopAfter)
ereport(LOG,
(errmsg("recovery stopping after commit of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
else
ereport(LOG,
(errmsg("recovery stopping before commit of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
else if (record_info == XLOG_XACT_ABORT)
{
if (recoveryStopAfter)
ereport(LOG,
(errmsg("recovery stopping after abort of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
else
{
ereport(LOG,
(errmsg("recovery stopping before abort of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
else
}
return stopsHere;
}
/*
* Same as recoveryStopsBefore, but called after applying the record.
*
* We also track the timestamp of the latest applied COMMIT/ABORT
* record in XLogCtl->recoveryLastXTime.
*/
static bool
recoveryStopsAfter(XLogRecord *record)
{
uint8 record_info;
TimestampTz recordXtime;
record_info = record->xl_info & ~XLR_INFO_MASK;
/*
* There can be many restore points that share the same name; we stop
* at the first one.
*/
if (recoveryTarget == RECOVERY_TARGET_NAME &&
record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
{
xl_restore_point *recordRestorePointData;
recordRestorePointData = (xl_restore_point *) XLogRecGetData(record);
if (strcmp(recordRestorePointData->rp_name, recoveryTargetName) == 0)
{
strncpy(recoveryStopName, recordRPName, MAXFNAMELEN);
recoveryStopAfter = true;
recoveryStopXid = InvalidTransactionId;
(void) getRecordTimestamp(record, &recoveryStopTime);
strncpy(recoveryStopName, recordRestorePointData->rp_name, MAXFNAMELEN);
ereport(LOG,
(errmsg("recovery stopping at restore point \"%s\", time %s",
recoveryStopName,
timestamptz_to_str(recoveryStopTime))));
return true;
}
}
if (record->xl_rmid == RM_XACT_ID &&
(record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_ABORT))
{
/* Update the last applied transaction timestamp */
if (getRecordTimestamp(record, &recordXtime))
SetLatestXTime(recordXtime);
/*
* Note that if we use a RECOVERY_TARGET_TIME then we can stop at a
* restore point since they are timestamped, though the latest
* transaction time is not updated.
* There can be only one transaction end record with this exact
* transactionid
*
* when testing for an xid, we MUST test for equality only, since
* transactions are numbered in the order they start, not the order
* they complete. A higher numbered xid will complete before you about
* 50% of the time...
*/
if (record->xl_rmid == RM_XACT_ID && recoveryStopAfter)
SetLatestXTime(recordXtime);
if (recoveryTarget == RECOVERY_TARGET_XID && recoveryTargetInclusive &&
record->xl_xid == recoveryTargetXid)
{
recoveryStopAfter = true;
recoveryStopXid = record->xl_xid;
recoveryStopTime = recordXtime;
recoveryStopName[0] = '\0';
if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
{
ereport(LOG,
(errmsg("recovery stopping after commit of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
else if (record_info == XLOG_XACT_ABORT)
{
ereport(LOG,
(errmsg("recovery stopping after abort of transaction %u, time %s",
recoveryStopXid,
timestamptz_to_str(recoveryStopTime))));
}
return true;
}
}
else if (record->xl_rmid == RM_XACT_ID)
SetLatestXTime(recordXtime);
return stopsHere;
return false;
}
/*
......@@ -5853,23 +5878,11 @@ SetRecoveryPause(bool recoveryPause)
SpinLockRelease(&xlogctl->info_lck);
}
static bool
SetRecoveryDelayUntilTime(TimestampTz xtime)
{
if (min_recovery_apply_delay != 0)
{
recoveryDelayUntilTime =
TimestampTzPlusMilliseconds(xtime, min_recovery_apply_delay);
return true;
}
return false;
}
/*
* When min_recovery_apply_delay is set, we wait long enough to make sure
* certain record types are applied at least that interval behind the master.
* See recoveryStopsHere().
*
* Returns true if we waited.
*
* Note that the delay is calculated between the WAL record log time and
* the current time on standby. We would prefer to keep track of when this
......@@ -5878,14 +5891,49 @@ SetRecoveryDelayUntilTime(TimestampTz xtime)
* is significantly more effort and complexity for little actual gain in
* usability.
*/
static void
recoveryApplyDelay(void)
static bool
recoveryApplyDelay(XLogRecord *record)
{
while (true)
{
uint8 record_info;
TimestampTz xtime;
long secs;
int microsecs;
/* nothing to do if no delay configured */
if (min_recovery_apply_delay == 0)
return false;
/*
* Is it a COMMIT record?
*
* We deliberately choose not to delay aborts since they have no effect
* on MVCC. We already allow replay of records that don't have a
* timestamp, so there is already opportunity for issues caused by early
* conflicts on standbys.
*/
record_info = record->xl_info & ~XLR_INFO_MASK;
if (!(record->xl_rmid == RM_XACT_ID &&
(record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT)))
return false;
if (!getRecordTimestamp(record, &xtime))
return false;
recoveryDelayUntilTime =
TimestampTzPlusMilliseconds(xtime, min_recovery_apply_delay);
/*
* Exit without arming the latch if it's already past time to apply this
* record
*/
TimestampDifference(GetCurrentTimestamp(), recoveryDelayUntilTime,
&secs, &microsecs);
if (secs <= 0 && microsecs <=0)
return false;
while (true)
{
ResetLatch(&XLogCtl->recoveryWakeupLatch);
/* might change the trigger file's location */
......@@ -5911,6 +5959,7 @@ recoveryApplyDelay(void)
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
secs * 1000L + microsecs / 1000);
}
return true;
}
/*
......@@ -6730,9 +6779,6 @@ StartupXLOG(void)
if (record != NULL)
{
bool recoveryContinue = true;
bool recoveryApply = true;
bool recoveryDelay = false;
ErrorContextCallback errcallback;
TimestampTz xtime;
......@@ -6792,13 +6838,9 @@ StartupXLOG(void)
/*
* Have we reached our recovery target?
*/
if (recoveryStopsHere(record, &recoveryApply, &recoveryDelay))
if (recoveryStopsBefore(record))
{
reachedStopPoint = true; /* see below */
recoveryContinue = false;
/* Exit loop if we reached non-inclusive recovery target */
if (!recoveryApply)
break;
}
......@@ -6806,10 +6848,8 @@ StartupXLOG(void)
* If we've been asked to lag the master, wait on
* latch until enough time has passed.
*/
if (recoveryDelay)
if (recoveryApplyDelay(record))
{
recoveryApplyDelay();
/*
* We test for paused recovery again here. If
* user sets delayed apply, it may be because
......@@ -6932,8 +6972,11 @@ StartupXLOG(void)
WalSndWakeup();
/* Exit loop if we reached inclusive recovery target */
if (!recoveryContinue)
if (recoveryStopsAfter(record))
{
reachedStopPoint = true;
break;
}
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment