Commit 35250b6a authored by Simon Riggs's avatar Simon Riggs

New recovery target recovery_target_lsn

Michael Paquier
parent 0c40ab3a
......@@ -157,9 +157,10 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
By default, recovery will recover to the end of the WAL log. The
following parameters can be used to specify an earlier stopping point.
At most one of <varname>recovery_target</>,
<varname>recovery_target_name</>, <varname>recovery_target_time</>, or
<varname>recovery_target_xid</> can be used; if more than one of these
is specified in the configuration file, the last entry will be used.
<varname>recovery_target_lsn</>, <varname>recovery_target_name</>,
<varname>recovery_target_time</>, or <varname>recovery_target_xid</>
can be used; if more than one of these is specified in the configuration
file, the last entry will be used.
</para>
<variablelist>
......@@ -232,6 +233,23 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows
</para>
</listitem>
</varlistentry>
<varlistentry id="recovery-target-lsn" xreflabel="recovery_target_lsn">
<term><varname>recovery_target_lsn</varname> (<type>pg_lsn</type>)
<indexterm>
<primary><varname>recovery_target_lsn</> recovery parameter</primary>
</indexterm>
</term>
<listitem>
<para>
This parameter specifies the LSN of the transaction log location up
to which recovery will proceed. The precise stopping point is also
influenced by <xref linkend="recovery-target-inclusive">. This
parameter is parsed using the system data type
<link linkend="datatype-pg-lsn"><type>pg_lsn</></link>.
</para>
</listitem>
</varlistentry>
</variablelist>
<para>
......
......@@ -67,8 +67,8 @@
# must set a recovery target.
#
# You may set a recovery target either by transactionId, by name,
# or by timestamp. Recovery may either include or exclude the
# transaction(s) with the recovery target value (ie, stop either
# by timestamp or by WAL position (LSN). Recovery may either include or
# exclude the transaction(s) with the recovery target value (ie, stop either
# just after or just before the given target, respectively).
#
#
......@@ -78,6 +78,8 @@
#
#recovery_target_xid = ''
#
#recovery_target_lsn = '' # e.g. '0/70006B8'
#
#recovery_target_inclusive = true
#
#
......
......@@ -67,6 +67,7 @@
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/relmapper.h"
#include "utils/snapmgr.h"
......@@ -254,6 +255,7 @@ static RecoveryTargetAction recoveryTargetAction = RECOVERY_TARGET_ACTION_PAUSE;
static TransactionId recoveryTargetXid;
static TimestampTz recoveryTargetTime;
static char *recoveryTargetName;
static XLogRecPtr recoveryTargetLSN;
static int recovery_min_apply_delay = 0;
static TimestampTz recoveryDelayUntilTime;
......@@ -275,6 +277,7 @@ static bool fast_promote = false;
*/
static TransactionId recoveryStopXid;
static TimestampTz recoveryStopTime;
static XLogRecPtr recoveryStopLSN;
static char recoveryStopName[MAXFNAMELEN];
static bool recoveryStopAfter;
......@@ -5078,6 +5081,23 @@ readRecoveryCommandFile(void)
(errmsg_internal("recovery_target_name = '%s'",
recoveryTargetName)));
}
else if (strcmp(item->name, "recovery_target_lsn") == 0)
{
recoveryTarget = RECOVERY_TARGET_LSN;
/*
* Convert the LSN string given by the user to XLogRecPtr form.
*/
recoveryTargetLSN =
DatumGetLSN(DirectFunctionCall3(pg_lsn_in,
CStringGetDatum(item->value),
ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1)));
ereport(DEBUG2,
(errmsg_internal("recovery_target_lsn = '%X/%X'",
(uint32) (recoveryTargetLSN >> 32),
(uint32) recoveryTargetLSN)));
}
else if (strcmp(item->name, "recovery_target") == 0)
{
if (strcmp(item->value, "immediate") == 0)
......@@ -5400,8 +5420,26 @@ recoveryStopsBefore(XLogReaderState *record)
recoveryStopAfter = false;
recoveryStopXid = InvalidTransactionId;
recoveryStopLSN = InvalidXLogRecPtr;
recoveryStopTime = 0;
recoveryStopName[0] = '\0';
return true;
}
/* Check if target LSN has been reached */
if (recoveryTarget == RECOVERY_TARGET_LSN &&
!recoveryTargetInclusive &&
record->ReadRecPtr >= recoveryTargetLSN)
{
recoveryStopAfter = false;
recoveryStopXid = InvalidTransactionId;
recoveryStopLSN = record->ReadRecPtr;
recoveryStopTime = 0;
recoveryStopName[0] = '\0';
ereport(LOG,
(errmsg("recovery stopping before WAL position (LSN) \"%X/%X\"",
(uint32) (recoveryStopLSN >> 32),
(uint32) recoveryStopLSN)));
return true;
}
......@@ -5479,6 +5517,7 @@ recoveryStopsBefore(XLogReaderState *record)
recoveryStopAfter = false;
recoveryStopXid = recordXid;
recoveryStopTime = recordXtime;
recoveryStopLSN = InvalidXLogRecPtr;
recoveryStopName[0] = '\0';
if (isCommit)
......@@ -5532,6 +5571,7 @@ recoveryStopsAfter(XLogReaderState *record)
{
recoveryStopAfter = true;
recoveryStopXid = InvalidTransactionId;
recoveryStopLSN = InvalidXLogRecPtr;
(void) getRecordTimestamp(record, &recoveryStopTime);
strlcpy(recoveryStopName, recordRestorePointData->rp_name, MAXFNAMELEN);
......@@ -5543,6 +5583,23 @@ recoveryStopsAfter(XLogReaderState *record)
}
}
/* Check if the target LSN has been reached */
if (recoveryTarget == RECOVERY_TARGET_LSN &&
recoveryTargetInclusive &&
record->ReadRecPtr >= recoveryTargetLSN)
{
recoveryStopAfter = true;
recoveryStopXid = InvalidTransactionId;
recoveryStopLSN = record->ReadRecPtr;
recoveryStopTime = 0;
recoveryStopName[0] = '\0';
ereport(LOG,
(errmsg("recovery stopping after WAL position (LSN) \"%X/%X\"",
(uint32) (recoveryStopLSN >> 32),
(uint32) recoveryStopLSN)));
return true;
}
if (rmid != RM_XACT_ID)
return false;
......@@ -5598,6 +5655,7 @@ recoveryStopsAfter(XLogReaderState *record)
recoveryStopAfter = true;
recoveryStopXid = recordXid;
recoveryStopTime = recordXtime;
recoveryStopLSN = InvalidXLogRecPtr;
recoveryStopName[0] = '\0';
if (xact_info == XLOG_XACT_COMMIT ||
......@@ -5629,6 +5687,7 @@ recoveryStopsAfter(XLogReaderState *record)
recoveryStopAfter = true;
recoveryStopXid = InvalidTransactionId;
recoveryStopTime = 0;
recoveryStopLSN = InvalidXLogRecPtr;
recoveryStopName[0] = '\0';
return true;
}
......@@ -6055,6 +6114,11 @@ StartupXLOG(void)
ereport(LOG,
(errmsg("starting point-in-time recovery to \"%s\"",
recoveryTargetName)));
else if (recoveryTarget == RECOVERY_TARGET_LSN)
ereport(LOG,
(errmsg("starting point-in-time recovery to WAL position (LSN) \"%X/%X\"",
(uint32) (recoveryTargetLSN >> 32),
(uint32) recoveryTargetLSN)));
else if (recoveryTarget == RECOVERY_TARGET_IMMEDIATE)
ereport(LOG,
(errmsg("starting point-in-time recovery to earliest consistent point")));
......@@ -7124,6 +7188,12 @@ StartupXLOG(void)
"%s %s\n",
recoveryStopAfter ? "after" : "before",
timestamptz_to_str(recoveryStopTime));
else if (recoveryTarget == RECOVERY_TARGET_LSN)
snprintf(reason, sizeof(reason),
"%s LSN %X/%X\n",
recoveryStopAfter ? "after" : "before",
(uint32 ) (recoveryStopLSN >> 32),
(uint32) recoveryStopLSN);
else if (recoveryTarget == RECOVERY_TARGET_NAME)
snprintf(reason, sizeof(reason),
"at restore point \"%s\"",
......
......@@ -83,6 +83,7 @@ typedef enum
RECOVERY_TARGET_XID,
RECOVERY_TARGET_TIME,
RECOVERY_TARGET_NAME,
RECOVERY_TARGET_LSN,
RECOVERY_TARGET_IMMEDIATE
} RecoveryTargetType;
......
......@@ -3,7 +3,7 @@ use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 7;
use Test::More tests => 9;
# Create and test a standby from given backup, with a certain
# recovery target.
......@@ -86,6 +86,16 @@ my $lsn4 =
$node_master->safe_psql('postgres',
"SELECT pg_create_restore_point('$recovery_name');");
# And now for a recovery target LSN
$node_master->safe_psql('postgres',
"INSERT INTO tab_int VALUES (generate_series(4001,5000))");
my $recovery_lsn = $node_master->safe_psql('postgres', "SELECT pg_current_xlog_location()");
my $lsn5 =
$node_master->safe_psql('postgres', "SELECT pg_current_xlog_location();");
$node_master->safe_psql('postgres',
"INSERT INTO tab_int VALUES (generate_series(5001,6000))");
# Force archiving of WAL file
$node_master->safe_psql('postgres', "SELECT pg_switch_xlog()");
......@@ -102,6 +112,9 @@ test_recovery_standby('time', 'standby_3', $node_master, \@recovery_params,
@recovery_params = ("recovery_target_name = '$recovery_name'");
test_recovery_standby('name', 'standby_4', $node_master, \@recovery_params,
"4000", $lsn4);
@recovery_params = ("recovery_target_lsn = '$recovery_lsn'");
test_recovery_standby('LSN', 'standby_5', $node_master, \@recovery_params,
"5000", $lsn5);
# Multiple targets
# Last entry has priority (note that an array respects the order of items
......@@ -111,16 +124,23 @@ test_recovery_standby('name', 'standby_4', $node_master, \@recovery_params,
"recovery_target_xid = '$recovery_txid'",
"recovery_target_time = '$recovery_time'");
test_recovery_standby('name + XID + time',
'standby_5', $node_master, \@recovery_params, "3000", $lsn3);
'standby_6', $node_master, \@recovery_params, "3000", $lsn3);
@recovery_params = (
"recovery_target_time = '$recovery_time'",
"recovery_target_name = '$recovery_name'",
"recovery_target_xid = '$recovery_txid'");
test_recovery_standby('time + name + XID',
'standby_6', $node_master, \@recovery_params, "2000", $lsn2);
'standby_7', $node_master, \@recovery_params, "2000", $lsn2);
@recovery_params = (
"recovery_target_xid = '$recovery_txid'",
"recovery_target_time = '$recovery_time'",
"recovery_target_name = '$recovery_name'");
test_recovery_standby('XID + time + name',
'standby_7', $node_master, \@recovery_params, "4000", $lsn4);
'standby_8', $node_master, \@recovery_params, "4000", $lsn4);
@recovery_params = (
"recovery_target_xid = '$recovery_txid'",
"recovery_target_time = '$recovery_time'",
"recovery_target_name = '$recovery_name'",
"recovery_target_lsn = '$recovery_lsn'",);
test_recovery_standby('XID + time + name + LSN',
'standby_9', $node_master, \@recovery_params, "5000", $lsn5);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment