Commit e76c1a0f authored by Tom Lane's avatar Tom Lane

Replace max_standby_delay with two parameters, max_standby_archive_delay and

max_standby_streaming_delay, and revise the implementation to avoid assuming
that timestamps found in WAL records can meaningfully be compared to clock
time on the standby server.  Instead, the delay limits are compared to the
elapsed time since we last obtained a new WAL segment from archive or since
we were last "caught up" to WAL data arriving via streaming replication.
This avoids problems with clock skew between primary and standby, as well
as other corner cases that the original coding would misbehave in, such
as the primary server having significant idle time between transactions.
Per my complaint some time ago and considerable ensuing discussion.

Do some desultory editing on the hot standby documentation, too.
parent e6a7416e
<!-- $PostgreSQL: pgsql/doc/src/sgml/config.sgml,v 1.288 2010/06/30 02:43:10 momjian Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/config.sgml,v 1.289 2010/07/03 20:43:57 tgl Exp $ -->
<chapter Id="runtime-config"> <chapter Id="runtime-config">
<title>Server Configuration</title> <title>Server Configuration</title>
...@@ -1841,6 +1841,8 @@ SET ENABLE_SEQSCAN TO OFF; ...@@ -1841,6 +1841,8 @@ SET ENABLE_SEQSCAN TO OFF;
<para> <para>
These settings control the behavior of the built-in These settings control the behavior of the built-in
<firstterm>streaming replication</> feature. <firstterm>streaming replication</> feature.
These parameters would be set on the primary server that is
to send replication data to one or more standby servers.
</para> </para>
<variablelist> <variablelist>
...@@ -1866,7 +1868,7 @@ SET ENABLE_SEQSCAN TO OFF; ...@@ -1866,7 +1868,7 @@ SET ENABLE_SEQSCAN TO OFF;
</indexterm> </indexterm>
<listitem> <listitem>
<para> <para>
Specifies the delay between activity rounds for the WAL sender. Specifies the delay between activity rounds for WAL sender processes.
In each round the WAL sender sends any WAL accumulated since the last In each round the WAL sender sends any WAL accumulated since the last
round to the standby server. It then sleeps for round to the standby server. It then sleeps for
<varname>wal_sender_delay</> milliseconds, and repeats. The default <varname>wal_sender_delay</> milliseconds, and repeats. The default
...@@ -1887,34 +1889,42 @@ SET ENABLE_SEQSCAN TO OFF; ...@@ -1887,34 +1889,42 @@ SET ENABLE_SEQSCAN TO OFF;
</indexterm> </indexterm>
<listitem> <listitem>
<para> <para>
Specifies the number of past log file segments kept in the Specifies the minimum number of past log file segments kept in the
<filename>pg_xlog</> <filename>pg_xlog</>
directory, in case a standby server needs to fetch them for streaming directory, in case a standby server needs to fetch them for streaming
replication. Each segment is normally 16 megabytes. If a standby replication. Each segment is normally 16 megabytes. If a standby
server connected to the primary falls behind by more than server connected to the primary falls behind by more than
<varname>wal_keep_segments</> segments, the primary might remove <varname>wal_keep_segments</> segments, the primary might remove
a WAL segment still needed by the standby, in which case the a WAL segment still needed by the standby, in which case the
replication connection will be terminated. replication connection will be terminated. (However, the standby
server can recover by fetching the segment from archive, if WAL
archiving is in use.)
</para> </para>
<para> <para>
This sets only the minimum number of segments retained for standby This sets only the minimum number of segments retained in
purposes; the system might need to retain more segments for WAL <filename>pg_xlog</>; the system might need to retain more segments
archival or to recover from a checkpoint. If <varname>wal_keep_segments</> for WAL archival or to recover from a checkpoint. If
is zero (the default), the system doesn't keep any extra segments <varname>wal_keep_segments</> is zero (the default), the system
for standby purposes, and the number of old WAL segments available doesn't keep any extra segments for standby purposes, and the number
for standbys is determined based only on the location of the previous of old WAL segments available to standby servers is a function of
checkpoint and status of WAL archiving. the location of the previous checkpoint and status of WAL
This parameter can only be set in the <filename>postgresql.conf</> archiving. This parameter can only be set in the
file or on the server command line. <filename>postgresql.conf</> file or on the server command line.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
</variablelist> </variablelist>
</sect2> </sect2>
<sect2 id="runtime-config-standby"> <sect2 id="runtime-config-standby">
<title>Standby Servers</title> <title>Standby Servers</title>
<para>
These settings control the behavior of a standby server that is
to receive replication data.
</para>
<variablelist> <variablelist>
<varlistentry id="guc-hot-standby" xreflabel="hot_standby"> <varlistentry id="guc-hot-standby" xreflabel="hot_standby">
...@@ -1933,38 +1943,63 @@ SET ENABLE_SEQSCAN TO OFF; ...@@ -1933,38 +1943,63 @@ SET ENABLE_SEQSCAN TO OFF;
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="guc-max-standby-delay" xreflabel="max_standby_delay"> <varlistentry id="guc-max-standby-archive-delay" xreflabel="max_standby_archive_delay">
<term><varname>max_standby_delay</varname> (<type>integer</type>)</term> <term><varname>max_standby_archive_delay</varname> (<type>integer</type>)</term>
<indexterm> <indexterm>
<primary><varname>max_standby_delay</> configuration parameter</primary> <primary><varname>max_standby_archive_delay</> configuration parameter</primary>
</indexterm> </indexterm>
<listitem> <listitem>
<para> <para>
When Hot Standby is active, this parameter specifies a wait policy When Hot Standby is active, this parameter determines how long the
for applying WAL entries that conflict with active queries. standby server should wait before canceling standby queries that
If a conflict should occur the server will delay up to this long conflict with about-to-be-applied WAL entries, as described in
before it cancels conflicting queries, as <xref linkend="hot-standby-conflict">.
described in <xref linkend="hot-standby-conflict">. <varname>max_standby_archive_delay</> applies when WAL data is
The default is 30 seconds (30 s). Units are milliseconds. being read from WAL archive (and is therefore not current).
A value of -1 causes the standby to wait forever for a conflicting The default is 30 seconds. Units are milliseconds if not specified.
query to complete. A value of -1 allows the standby to wait forever for conflicting
queries to complete.
This parameter can only be set in the <filename>postgresql.conf</> This parameter can only be set in the <filename>postgresql.conf</>
file or on the server command line. file or on the server command line.
</para> </para>
<para> <para>
A high value makes query cancel less likely. Note that <varname>max_standby_archive_delay</> is not the same as the
Increasing this parameter or setting it to -1 might delay master server maximum length of time a query can run before cancellation; rather it
changes from appearing on the standby. is the maximum total time allowed to apply any one WAL segment's data.
Thus, if one query has resulted in significant delay earlier in the
WAL segment, subsequent conflicting queries will have much less grace
time.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-max-standby-streaming-delay" xreflabel="max_standby_streaming_delay">
<term><varname>max_standby_streaming_delay</varname> (<type>integer</type>)</term>
<indexterm>
<primary><varname>max_standby_streaming_delay</> configuration parameter</primary>
</indexterm>
<listitem>
<para>
When Hot Standby is active, this parameter determines how long the
standby server should wait before canceling standby queries that
conflict with about-to-be-applied WAL entries, as described in
<xref linkend="hot-standby-conflict">.
<varname>max_standby_streaming_delay</> applies when WAL data is
being received via streaming replication.
The default is 30 seconds. Units are milliseconds if not specified.
A value of -1 allows the standby to wait forever for conflicting
queries to complete.
This parameter can only be set in the <filename>postgresql.conf</>
file or on the server command line.
</para> </para>
<para> <para>
While it is tempting to believe that <varname>max_standby_delay</> Note that <varname>max_standby_streaming_delay</> is not the same as
is the maximum length of time a query can run before the maximum length of time a query can run before cancellation; rather
cancellation is possible, this is not true. When a long-running it is the maximum total time allowed to apply WAL data once it has
query ends, there is a finite time required to apply backlogged been received from the primary server. Thus, if one query has
WAL logs. If a second long-running query appears before the resulted in significant delay, subsequent conflicting queries will
WAL has caught up, the snapshot taken by the second query will have much less grace time until the standby server has caught up
allow significantly less than <varname>max_standby_delay</> seconds again.
before query cancellation is possible.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
This diff is collapsed.
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.427 2010/06/28 19:46:19 rhaas Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.428 2010/07/03 20:43:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -72,7 +72,6 @@ int XLogArchiveTimeout = 0; ...@@ -72,7 +72,6 @@ int XLogArchiveTimeout = 0;
bool XLogArchiveMode = false; bool XLogArchiveMode = false;
char *XLogArchiveCommand = NULL; char *XLogArchiveCommand = NULL;
bool EnableHotStandby = false; bool EnableHotStandby = false;
int MaxStandbyDelay = 30 * 1000;
bool fullPageWrites = true; bool fullPageWrites = true;
bool log_checkpoints = false; bool log_checkpoints = false;
int sync_method = DEFAULT_SYNC_METHOD; int sync_method = DEFAULT_SYNC_METHOD;
...@@ -449,6 +448,15 @@ static ControlFileData *ControlFile = NULL; ...@@ -449,6 +448,15 @@ static ControlFileData *ControlFile = NULL;
*/ */
static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}}; static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
/*
* Codes indicating where we got a WAL file from during recovery, or where
* to attempt to get one. These are chosen so that they can be OR'd together
* in a bitmask state variable.
*/
#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */
#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */
#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */
/* /*
* openLogFile is -1 or a kernel FD for an open log file segment. * openLogFile is -1 or a kernel FD for an open log file segment.
* When it's open, openLogOff is the current seek offset in the file. * When it's open, openLogOff is the current seek offset in the file.
...@@ -460,14 +468,6 @@ static uint32 openLogId = 0; ...@@ -460,14 +468,6 @@ static uint32 openLogId = 0;
static uint32 openLogSeg = 0; static uint32 openLogSeg = 0;
static uint32 openLogOff = 0; static uint32 openLogOff = 0;
/*
* Codes indicating where we got a WAL file from during recovery, or where
* to attempt to get one.
*/
#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */
#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */
#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */
/* /*
* These variables are used similarly to the ones above, but for reading * These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset * the XLOG. Note, however, that readOff generally represents the offset
...@@ -487,7 +487,16 @@ static int readSource = 0; /* XLOG_FROM_* code */ ...@@ -487,7 +487,16 @@ static int readSource = 0; /* XLOG_FROM_* code */
* Keeps track of which sources we've tried to read the current WAL * Keeps track of which sources we've tried to read the current WAL
* record from and failed. * record from and failed.
*/ */
static int failedSources = 0; static int failedSources = 0; /* OR of XLOG_FROM_* codes */
/*
* These variables track when we last obtained some WAL data to process,
* and where we got it from. (XLogReceiptSource is initially the same as
* readSource, but readSource gets reset to zero when we don't have data
* to process right now.)
*/
static TimestampTz XLogReceiptTime = 0;
static int XLogReceiptSource = 0; /* XLOG_FROM_* code */
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */ /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL; static char *readBuf = NULL;
...@@ -2626,7 +2635,7 @@ XLogFileOpen(uint32 log, uint32 seg) ...@@ -2626,7 +2635,7 @@ XLogFileOpen(uint32 log, uint32 seg)
* Open a logfile segment for reading (during recovery). * Open a logfile segment for reading (during recovery).
* *
* If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive. * If source = XLOG_FROM_ARCHIVE, the segment is retrieved from archive.
* If source = XLOG_FROM_PG_XLOG, it's read from pg_xlog. * Otherwise, it's assumed to be already available in pg_xlog.
*/ */
static int static int
XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
...@@ -2655,6 +2664,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, ...@@ -2655,6 +2664,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
break; break;
case XLOG_FROM_PG_XLOG: case XLOG_FROM_PG_XLOG:
case XLOG_FROM_STREAM:
XLogFilePath(path, tli, log, seg); XLogFilePath(path, tli, log, seg);
restoredFromArchive = false; restoredFromArchive = false;
break; break;
...@@ -2674,7 +2684,13 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, ...@@ -2674,7 +2684,13 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
xlogfname); xlogfname);
set_ps_display(activitymsg, false); set_ps_display(activitymsg, false);
/* Track source of data in assorted state variables */
readSource = source; readSource = source;
XLogReceiptSource = source;
/* In FROM_STREAM case, caller tracks receipt time, not me */
if (source != XLOG_FROM_STREAM)
XLogReceiptTime = GetCurrentTimestamp();
return fd; return fd;
} }
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
...@@ -5568,7 +5584,7 @@ pg_is_in_recovery(PG_FUNCTION_ARGS) ...@@ -5568,7 +5584,7 @@ pg_is_in_recovery(PG_FUNCTION_ARGS)
/* /*
* Returns timestamp of last recovered commit/abort record. * Returns timestamp of last recovered commit/abort record.
*/ */
TimestampTz static TimestampTz
GetLatestXLogTime(void) GetLatestXLogTime(void)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
...@@ -5581,6 +5597,23 @@ GetLatestXLogTime(void) ...@@ -5581,6 +5597,23 @@ GetLatestXLogTime(void)
return recoveryLastXTime; return recoveryLastXTime;
} }
/*
* Returns time of receipt of current chunk of XLOG data, as well as
* whether it was received from streaming replication or from archives.
*/
void
GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream)
{
/*
* This must be executed in the startup process, since we don't export
* the relevant state to shared memory.
*/
Assert(InRecovery);
*rtime = XLogReceiptTime;
*fromStream = (XLogReceiptSource == XLOG_FROM_STREAM);
}
/* /*
* Note that text field supplied is a parameter name and does not require * Note that text field supplied is a parameter name and does not require
* translation * translation
...@@ -6060,6 +6093,9 @@ StartupXLOG(void) ...@@ -6060,6 +6093,9 @@ StartupXLOG(void)
xlogctl->recoveryLastRecPtr = ReadRecPtr; xlogctl->recoveryLastRecPtr = ReadRecPtr;
SpinLockRelease(&xlogctl->info_lck); SpinLockRelease(&xlogctl->info_lck);
/* Also ensure XLogReceiptTime has a sane value */
XLogReceiptTime = GetCurrentTimestamp();
/* /*
* Let postmaster know we've started redo now, so that it can * Let postmaster know we've started redo now, so that it can
* launch bgwriter to perform restartpoints. We don't bother * launch bgwriter to perform restartpoints. We don't bother
...@@ -7647,7 +7683,7 @@ CreateRestartPoint(int flags) ...@@ -7647,7 +7683,7 @@ CreateRestartPoint(int flags)
XLogRecPtr endptr; XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */ /* Get the current (or recent) end of xlog */
endptr = GetWalRcvWriteRecPtr(); endptr = GetWalRcvWriteRecPtr(NULL);
PrevLogSeg(_logId, _logSeg); PrevLogSeg(_logId, _logSeg);
RemoveOldXlogFiles(_logId, _logSeg, endptr); RemoveOldXlogFiles(_logId, _logSeg, endptr);
...@@ -8757,7 +8793,7 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS) ...@@ -8757,7 +8793,7 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
XLogRecPtr recptr; XLogRecPtr recptr;
char location[MAXFNAMELEN]; char location[MAXFNAMELEN];
recptr = GetWalRcvWriteRecPtr(); recptr = GetWalRcvWriteRecPtr(NULL);
if (recptr.xlogid == 0 && recptr.xrecoff == 0) if (recptr.xlogid == 0 && recptr.xrecoff == 0)
PG_RETURN_NULL(); PG_RETURN_NULL();
...@@ -9272,6 +9308,8 @@ retry: ...@@ -9272,6 +9308,8 @@ retry:
{ {
if (WalRcvInProgress()) if (WalRcvInProgress())
{ {
bool havedata;
/* /*
* If we find an invalid record in the WAL streamed from * If we find an invalid record in the WAL streamed from
* master, something is seriously wrong. There's little * master, something is seriously wrong. There's little
...@@ -9289,28 +9327,62 @@ retry: ...@@ -9289,28 +9327,62 @@ retry:
} }
/* /*
* While walreceiver is active, wait for new WAL to arrive * Walreceiver is active, so see if new data has arrived.
* from primary. *
* We only advance XLogReceiptTime when we obtain fresh
* WAL from walreceiver and observe that we had already
* processed everything before the most recent "chunk"
* that it flushed to disk. In steady state where we are
* keeping up with the incoming data, XLogReceiptTime
* will be updated on each cycle. When we are behind,
* XLogReceiptTime will not advance, so the grace time
* alloted to conflicting queries will decrease.
*/ */
receivedUpto = GetWalRcvWriteRecPtr();
if (XLByteLT(*RecPtr, receivedUpto)) if (XLByteLT(*RecPtr, receivedUpto))
havedata = true;
else
{
XLogRecPtr latestChunkStart;
receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
if (XLByteLT(*RecPtr, receivedUpto))
{
havedata = true;
if (!XLByteLT(*RecPtr, latestChunkStart))
XLogReceiptTime = GetCurrentTimestamp();
}
else
havedata = false;
}
if (havedata)
{ {
/* /*
* Great, streamed far enough. Open the file if it's * Great, streamed far enough. Open the file if it's
* not open already. * not open already. Use XLOG_FROM_STREAM so that
* source info is set correctly and XLogReceiptTime
* isn't changed.
*/ */
if (readFile < 0) if (readFile < 0)
{ {
readFile = readFile =
XLogFileRead(readId, readSeg, PANIC, XLogFileRead(readId, readSeg, PANIC,
recoveryTargetTLI, recoveryTargetTLI,
XLOG_FROM_PG_XLOG, false); XLOG_FROM_STREAM, false);
Assert(readFile >= 0);
switched_segment = true; switched_segment = true;
}
else
{
/* just make sure source info is correct... */
readSource = XLOG_FROM_STREAM; readSource = XLOG_FROM_STREAM;
XLogReceiptSource = XLOG_FROM_STREAM;
} }
break; break;
} }
/*
* Data not here yet, so check for trigger then sleep.
*/
if (CheckForStandbyTrigger()) if (CheckForStandbyTrigger())
goto triggered; goto triggered;
...@@ -9388,7 +9460,7 @@ retry: ...@@ -9388,7 +9460,7 @@ retry:
readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2,
sources); sources);
switched_segment = true; switched_segment = true;
if (readFile != -1) if (readFile >= 0)
break; break;
/* /*
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.14 2010/06/09 15:04:07 heikki Exp $ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.15 2010/07/03 20:43:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -524,6 +524,7 @@ XLogWalRcvFlush(void) ...@@ -524,6 +524,7 @@ XLogWalRcvFlush(void)
/* Update shared-memory status */ /* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
walrcv->latestChunkStart = walrcv->receivedUpto;
walrcv->receivedUpto = LogstreamResult.Flush; walrcv->receivedUpto = LogstreamResult.Flush;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.5 2010/04/28 16:54:15 tgl Exp $ * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.6 2010/07/03 20:43:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -187,10 +187,11 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) ...@@ -187,10 +187,11 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
if (recptr.xrecoff % XLogSegSize != 0) if (recptr.xrecoff % XLogSegSize != 0)
recptr.xrecoff -= recptr.xrecoff % XLogSegSize; recptr.xrecoff -= recptr.xrecoff % XLogSegSize;
SpinLockAcquire(&walrcv->mutex);
/* It better be stopped before we try to restart it */ /* It better be stopped before we try to restart it */
Assert(walrcv->walRcvState == WALRCV_STOPPED); Assert(walrcv->walRcvState == WALRCV_STOPPED);
SpinLockAcquire(&walrcv->mutex);
if (conninfo != NULL) if (conninfo != NULL)
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else else
...@@ -199,16 +200,22 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) ...@@ -199,16 +200,22 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
walrcv->startTime = now; walrcv->startTime = now;
walrcv->receivedUpto = recptr; walrcv->receivedUpto = recptr;
walrcv->latestChunkStart = recptr;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
} }
/* /*
* Returns the byte position that walreceiver has written * Returns the last+1 byte position that walreceiver has written.
*
* Optionally, returns the previous chunk start, that is the first byte
* written in the most recent walreceiver flush cycle. Callers not
* interested in that value may pass NULL for latestChunkStart.
*/ */
XLogRecPtr XLogRecPtr
GetWalRcvWriteRecPtr(void) GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv; volatile WalRcvData *walrcv = WalRcv;
...@@ -216,6 +223,8 @@ GetWalRcvWriteRecPtr(void) ...@@ -216,6 +223,8 @@ GetWalRcvWriteRecPtr(void)
SpinLockAcquire(&walrcv->mutex); SpinLockAcquire(&walrcv->mutex);
recptr = walrcv->receivedUpto; recptr = walrcv->receivedUpto;
if (latestChunkStart)
*latestChunkStart = walrcv->latestChunkStart;
SpinLockRelease(&walrcv->mutex); SpinLockRelease(&walrcv->mutex);
return recptr; return recptr;
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/ipc/standby.c,v 1.25 2010/06/14 00:49:24 itagaki Exp $ * $PostgreSQL: pgsql/src/backend/storage/ipc/standby.c,v 1.26 2010/07/03 20:43:58 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -30,7 +30,10 @@ ...@@ -30,7 +30,10 @@
#include "storage/standby.h" #include "storage/standby.h"
#include "utils/ps_status.h" #include "utils/ps_status.h"
/* User-settable GUC parameters */
int vacuum_defer_cleanup_age; int vacuum_defer_cleanup_age;
int max_standby_archive_delay = 30 * 1000;
int max_standby_streaming_delay = 30 * 1000;
static List *RecoveryLockList; static List *RecoveryLockList;
...@@ -40,13 +43,14 @@ static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid); ...@@ -40,13 +43,14 @@ static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts); static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks); static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
/* /*
* InitRecoveryTransactionEnvironment * InitRecoveryTransactionEnvironment
* Initiallize tracking of in-progress transactions in master * Initialize tracking of in-progress transactions in master
* *
* We need to issue shared invalidations and hold locks. Holding locks * We need to issue shared invalidations and hold locks. Holding locks
* means others may want to wait on us, so we need to make lock table * means others may want to wait on us, so we need to make a lock table
* inserts to appear like a transaction. We could create and delete * vxact entry like a real transaction. We could create and delete
* lock table entries for each transaction but its simpler just to create * lock table entries for each transaction but its simpler just to create
* one permanent entry and leave it there all the time. Locks are then * one permanent entry and leave it there all the time. Locks are then
* acquired and released as needed. Yes, this means you can see the * acquired and released as needed. Yes, this means you can see the
...@@ -58,7 +62,7 @@ InitRecoveryTransactionEnvironment(void) ...@@ -58,7 +62,7 @@ InitRecoveryTransactionEnvironment(void)
VirtualTransactionId vxid; VirtualTransactionId vxid;
/* /*
* Initialise shared invalidation management for Startup process, being * Initialize shared invalidation management for Startup process, being
* careful to register ourselves as a sendOnly process so we don't need to * careful to register ourselves as a sendOnly process so we don't need to
* read messages, nor will we get signalled when the queue starts filling * read messages, nor will we get signalled when the queue starts filling
* up. * up.
...@@ -113,6 +117,36 @@ ShutdownRecoveryTransactionEnvironment(void) ...@@ -113,6 +117,36 @@ ShutdownRecoveryTransactionEnvironment(void)
* ----------------------------------------------------- * -----------------------------------------------------
*/ */
/*
* Determine the cutoff time at which we want to start canceling conflicting
* transactions. Returns zero (a time safely in the past) if we are willing
* to wait forever.
*/
static TimestampTz
GetStandbyLimitTime(void)
{
TimestampTz rtime;
bool fromStream;
/*
* The cutoff time is the last WAL data receipt time plus the appropriate
* delay variable. Delay of -1 means wait forever.
*/
GetXLogReceiptTime(&rtime, &fromStream);
if (fromStream)
{
if (max_standby_streaming_delay < 0)
return 0; /* wait forever */
return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
}
else
{
if (max_standby_archive_delay < 0)
return 0; /* wait forever */
return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
}
}
#define STANDBY_INITIAL_WAIT_US 1000 #define STANDBY_INITIAL_WAIT_US 1000
static int standbyWait_us = STANDBY_INITIAL_WAIT_US; static int standbyWait_us = STANDBY_INITIAL_WAIT_US;
...@@ -124,10 +158,11 @@ static int standbyWait_us = STANDBY_INITIAL_WAIT_US; ...@@ -124,10 +158,11 @@ static int standbyWait_us = STANDBY_INITIAL_WAIT_US;
static bool static bool
WaitExceedsMaxStandbyDelay(void) WaitExceedsMaxStandbyDelay(void)
{ {
/* Are we past max_standby_delay? */ TimestampTz ltime;
if (MaxStandbyDelay >= 0 &&
TimestampDifferenceExceeds(GetLatestXLogTime(), GetCurrentTimestamp(), /* Are we past the limit time? */
MaxStandbyDelay)) ltime = GetStandbyLimitTime();
if (ltime && GetCurrentTimestamp() >= ltime)
return true; return true;
/* /*
...@@ -203,8 +238,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist, ...@@ -203,8 +238,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
pid = CancelVirtualTransaction(*waitlist, reason); pid = CancelVirtualTransaction(*waitlist, reason);
/* /*
* Wait awhile for it to die so that we avoid flooding an * Wait a little bit for it to die so that we avoid flooding
* unresponsive backend when system is heavily loaded. * an unresponsive backend when system is heavily loaded.
*/ */
if (pid != 0) if (pid != 0)
pg_usleep(5000L); pg_usleep(5000L);
...@@ -286,7 +321,7 @@ void ...@@ -286,7 +321,7 @@ void
ResolveRecoveryConflictWithDatabase(Oid dbid) ResolveRecoveryConflictWithDatabase(Oid dbid)
{ {
/* /*
* We don't do ResolveRecoveryConflictWithVirutalXIDs() here since that * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
* only waits for transactions and completely idle sessions would block * only waits for transactions and completely idle sessions would block
* us. This is rare enough that we do this as simply as possible: no wait, * us. This is rare enough that we do this as simply as possible: no wait,
* just force them off immediately. * just force them off immediately.
...@@ -355,12 +390,11 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid) ...@@ -355,12 +390,11 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
* the limit of our patience. The sleep in LockBufferForCleanup() is * the limit of our patience. The sleep in LockBufferForCleanup() is
* performed here, for code clarity. * performed here, for code clarity.
* *
* Resolve conflict by sending a SIGUSR1 reason to all backends to check if * Resolve conflicts by sending a PROCSIG signal to all backends to check if
* they hold one of the buffer pins that is blocking Startup process. If so, * they hold one of the buffer pins that is blocking Startup process. If so,
* backends will take an appropriate error action, ERROR or FATAL. * backends will take an appropriate error action, ERROR or FATAL.
* *
* We also check for deadlocks before we wait, though applications that cause * We also must check for deadlocks. Deadlocks occur because if queries
* these will be extremely rare. Deadlocks occur because if queries
* wait on a lock, that must be behind an AccessExclusiveLock, which can only * wait on a lock, that must be behind an AccessExclusiveLock, which can only
* be cleared if the Startup process replays a transaction completion record. * be cleared if the Startup process replays a transaction completion record.
* If Startup process is also waiting then that is a deadlock. The deadlock * If Startup process is also waiting then that is a deadlock. The deadlock
...@@ -368,41 +402,35 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid) ...@@ -368,41 +402,35 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
* Startup is sleeping and the query waits on a lock. We protect against * Startup is sleeping and the query waits on a lock. We protect against
* only the former sequence here, the latter sequence is checked prior to * only the former sequence here, the latter sequence is checked prior to
* the query sleeping, in CheckRecoveryConflictDeadlock(). * the query sleeping, in CheckRecoveryConflictDeadlock().
*
* Deadlocks are extremely rare, and relatively expensive to check for,
* so we don't do a deadlock check right away ... only if we have had to wait
* at least deadlock_timeout. Most of the logic about that is in proc.c.
*/ */
void void
ResolveRecoveryConflictWithBufferPin(void) ResolveRecoveryConflictWithBufferPin(void)
{ {
bool sig_alarm_enabled = false; bool sig_alarm_enabled = false;
TimestampTz ltime;
TimestampTz now;
Assert(InHotStandby); Assert(InHotStandby);
if (MaxStandbyDelay == 0) ltime = GetStandbyLimitTime();
{ now = GetCurrentTimestamp();
/*
* We don't want to wait, so just tell everybody holding the pin to
* get out of town.
*/
SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
}
else if (MaxStandbyDelay < 0)
{
TimestampTz now = GetCurrentTimestamp();
if (!ltime)
{
/* /*
* Set timeout for deadlock check (only) * We're willing to wait forever for conflicts, so set timeout for
* deadlock check (only)
*/ */
if (enable_standby_sig_alarm(now, now, true)) if (enable_standby_sig_alarm(now, now, true))
sig_alarm_enabled = true; sig_alarm_enabled = true;
else else
elog(FATAL, "could not set timer for process wakeup"); elog(FATAL, "could not set timer for process wakeup");
} }
else else if (now >= ltime)
{
TimestampTz then = GetLatestXLogTime();
TimestampTz now = GetCurrentTimestamp();
/* Are we past max_standby_delay? */
if (TimestampDifferenceExceeds(then, now, MaxStandbyDelay))
{ {
/* /*
* We're already behind, so clear a path as quickly as possible. * We're already behind, so clear a path as quickly as possible.
...@@ -411,24 +439,15 @@ ResolveRecoveryConflictWithBufferPin(void) ...@@ -411,24 +439,15 @@ ResolveRecoveryConflictWithBufferPin(void)
} }
else else
{ {
TimestampTz max_standby_time;
/* /*
* At what point in the future do we hit MaxStandbyDelay? * Wake up at ltime, and check for deadlocks as well if we will be
* waiting longer than deadlock_timeout
*/ */
max_standby_time = TimestampTzPlusMilliseconds(then, MaxStandbyDelay); if (enable_standby_sig_alarm(now, ltime, false))
Assert(max_standby_time > now);
/*
* Wake up at MaxStandby delay, and check for deadlocks as well
* if we will be waiting longer than deadlock_timeout
*/
if (enable_standby_sig_alarm(now, max_standby_time, false))
sig_alarm_enabled = true; sig_alarm_enabled = true;
else else
elog(FATAL, "could not set timer for process wakeup"); elog(FATAL, "could not set timer for process wakeup");
} }
}
/* Wait to be signaled by UnpinBuffer() */ /* Wait to be signaled by UnpinBuffer() */
ProcWaitForSignal(); ProcWaitForSignal();
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.219 2010/05/26 19:52:52 sriggs Exp $ * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.220 2010/07/03 20:43:58 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -1627,12 +1627,13 @@ handle_sig_alarm(SIGNAL_ARGS) ...@@ -1627,12 +1627,13 @@ handle_sig_alarm(SIGNAL_ARGS)
bool bool
enable_standby_sig_alarm(TimestampTz now, TimestampTz fin_time, bool deadlock_only) enable_standby_sig_alarm(TimestampTz now, TimestampTz fin_time, bool deadlock_only)
{ {
TimestampTz deadlock_time = TimestampTzPlusMilliseconds(now, DeadlockTimeout); TimestampTz deadlock_time = TimestampTzPlusMilliseconds(now,
DeadlockTimeout);
if (deadlock_only) if (deadlock_only)
{ {
/* /*
* Wake up at DeadlockTimeout only, then wait forever * Wake up at deadlock_time only, then wait forever
*/ */
statement_fin_time = deadlock_time; statement_fin_time = deadlock_time;
deadlock_timeout_active = true; deadlock_timeout_active = true;
...@@ -1641,7 +1642,7 @@ enable_standby_sig_alarm(TimestampTz now, TimestampTz fin_time, bool deadlock_on ...@@ -1641,7 +1642,7 @@ enable_standby_sig_alarm(TimestampTz now, TimestampTz fin_time, bool deadlock_on
else if (fin_time > deadlock_time) else if (fin_time > deadlock_time)
{ {
/* /*
* Wake up at DeadlockTimeout, then again at MaxStandbyDelay * Wake up at deadlock_time, then again at fin_time
*/ */
statement_fin_time = deadlock_time; statement_fin_time = deadlock_time;
statement_fin_time2 = fin_time; statement_fin_time2 = fin_time;
...@@ -1651,7 +1652,7 @@ enable_standby_sig_alarm(TimestampTz now, TimestampTz fin_time, bool deadlock_on ...@@ -1651,7 +1652,7 @@ enable_standby_sig_alarm(TimestampTz now, TimestampTz fin_time, bool deadlock_on
else else
{ {
/* /*
* Wake only at MaxStandbyDelay because its fairly soon * Wake only at fin_time because its fairly soon
*/ */
statement_fin_time = fin_time; statement_fin_time = fin_time;
deadlock_timeout_active = false; deadlock_timeout_active = false;
...@@ -1729,15 +1730,16 @@ CheckStandbyTimeout(void) ...@@ -1729,15 +1730,16 @@ CheckStandbyTimeout(void)
if (deadlock_timeout_active) if (deadlock_timeout_active)
{ {
/* /*
* We're still waiting when we reach DeadlockTimeout, so send out a request * We're still waiting when we reach deadlock timeout, so send out
* to have other backends check themselves for deadlock. Then continue * a request to have other backends check themselves for
* waiting until MaxStandbyDelay. * deadlock. Then continue waiting until statement_fin_time,
* if that's set.
*/ */
SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK); SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
deadlock_timeout_active = false; deadlock_timeout_active = false;
/* /*
* Begin second waiting period to MaxStandbyDelay if required. * Begin second waiting period if required.
*/ */
if (statement_timeout_active) if (statement_timeout_active)
{ {
...@@ -1748,8 +1750,8 @@ CheckStandbyTimeout(void) ...@@ -1748,8 +1750,8 @@ CheckStandbyTimeout(void)
else else
{ {
/* /*
* We've now reached MaxStandbyDelay, so ask all conflicts to leave, cos * We've now reached statement_fin_time, so ask all conflicts to
* its time for us to press ahead with applying changes in recovery. * leave, so we can press ahead with applying changes in recovery.
*/ */
SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
} }
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* Written by Peter Eisentraut <peter_e@gmx.net>. * Written by Peter Eisentraut <peter_e@gmx.net>.
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.557 2010/06/25 13:11:25 sriggs Exp $ * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.558 2010/07/03 20:43:58 tgl Exp $
* *
*-------------------------------------------------------------------- *--------------------------------------------------------------------
*/ */
...@@ -57,6 +57,7 @@ ...@@ -57,6 +57,7 @@
#include "postmaster/walwriter.h" #include "postmaster/walwriter.h"
#include "replication/walsender.h" #include "replication/walsender.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/standby.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
#include "tsearch/ts_cache.h" #include "tsearch/ts_cache.h"
...@@ -116,7 +117,6 @@ extern char *default_tablespace; ...@@ -116,7 +117,6 @@ extern char *default_tablespace;
extern char *temp_tablespaces; extern char *temp_tablespaces;
extern bool synchronize_seqscans; extern bool synchronize_seqscans;
extern bool fullPageWrites; extern bool fullPageWrites;
extern int vacuum_defer_cleanup_age;
extern int ssl_renegotiation_limit; extern int ssl_renegotiation_limit;
#ifdef TRACE_SORT #ifdef TRACE_SORT
...@@ -1373,6 +1373,26 @@ static struct config_int ConfigureNamesInt[] = ...@@ -1373,6 +1373,26 @@ static struct config_int ConfigureNamesInt[] =
1000, 1, INT_MAX / 1000, NULL, NULL 1000, 1, INT_MAX / 1000, NULL, NULL
}, },
{
{"max_standby_archive_delay", PGC_SIGHUP, WAL_STANDBY_SERVERS,
gettext_noop("Sets the maximum delay before canceling queries when a hot standby server is processing archived WAL data."),
NULL,
GUC_UNIT_MS
},
&max_standby_archive_delay,
30 * 1000, -1, INT_MAX / 1000, NULL, NULL
},
{
{"max_standby_streaming_delay", PGC_SIGHUP, WAL_STANDBY_SERVERS,
gettext_noop("Sets the maximum delay before canceling queries when a hot standby server is processing streamed WAL data."),
NULL,
GUC_UNIT_MS
},
&max_standby_streaming_delay,
30 * 1000, -1, INT_MAX / 1000, NULL, NULL
},
/* /*
* Note: MaxBackends is limited to INT_MAX/4 because some places compute * Note: MaxBackends is limited to INT_MAX/4 because some places compute
* 4*MaxBackends without any overflow check. This check is made in * 4*MaxBackends without any overflow check. This check is made in
...@@ -1392,16 +1412,6 @@ static struct config_int ConfigureNamesInt[] = ...@@ -1392,16 +1412,6 @@ static struct config_int ConfigureNamesInt[] =
100, 1, INT_MAX / 4, assign_maxconnections, NULL 100, 1, INT_MAX / 4, assign_maxconnections, NULL
}, },
{
{"max_standby_delay", PGC_SIGHUP, WAL_STANDBY_SERVERS,
gettext_noop("Sets the maximum delay to avoid conflict processing on hot standby servers."),
NULL,
GUC_UNIT_MS
},
&MaxStandbyDelay,
30 * 1000, -1, INT_MAX / 1000, NULL, NULL
},
{ {
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, {"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."), gettext_noop("Sets the number of connection slots reserved for superusers."),
......
...@@ -186,15 +186,19 @@ ...@@ -186,15 +186,19 @@
# - Streaming Replication - # - Streaming Replication -
#max_wal_senders = 0 # max number of walsender processes #max_wal_senders = 0 # max number of walsender processes
#wal_sender_delay = 200ms # 1-10000 milliseconds #wal_sender_delay = 200ms # walsender cycle time, 1-10000 milliseconds
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables #wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
# - Standby Servers - # - Standby Servers -
#hot_standby = off # allows queries during recovery #hot_standby = off # "on" allows queries during recovery
#max_standby_delay = 30s # max acceptable lag to allow queries to #max_standby_archive_delay = 30s # max delay before canceling queries
# complete without conflict; -1 means forever # when reading WAL from archive;
#vacuum_defer_cleanup_age = 0 # num transactions by which cleanup is deferred # -1 allows indefinite delay
#max_standby_streaming_delay = 30s # max delay before canceling queries
# when reading streaming WAL;
# -1 allows indefinite delay
#vacuum_defer_cleanup_age = 0 # number of transactions by which cleanup is deferred
#------------------------------------------------------------------------------ #------------------------------------------------------------------------------
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.113 2010/06/17 16:41:25 tgl Exp $ * $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.114 2010/07/03 20:43:58 tgl Exp $
*/ */
#ifndef XLOG_H #ifndef XLOG_H
#define XLOG_H #define XLOG_H
...@@ -135,22 +135,25 @@ typedef struct XLogRecData ...@@ -135,22 +135,25 @@ typedef struct XLogRecData
extern PGDLLIMPORT TimeLineID ThisTimeLineID; /* current TLI */ extern PGDLLIMPORT TimeLineID ThisTimeLineID; /* current TLI */
/* /*
* Prior to 8.4, all activity during recovery was carried out by Startup * Prior to 8.4, all activity during recovery was carried out by the startup
* process. This local variable continues to be used in many parts of the * process. This local variable continues to be used in many parts of the
* code to indicate actions taken by RecoveryManagers. Other processes who * code to indicate actions taken by RecoveryManagers. Other processes that
* potentially perform work during recovery should check RecoveryInProgress() * potentially perform work during recovery should check RecoveryInProgress().
* see XLogCtl notes in xlog.c * See XLogCtl notes in xlog.c.
*/ */
extern bool InRecovery; extern bool InRecovery;
/* /*
* Like InRecovery, standbyState is only valid in the startup process. * Like InRecovery, standbyState is only valid in the startup process.
* In all other processes it will have the value STANDBY_DISABLED (so
* InHotStandby will read as FALSE).
* *
* In DISABLED state, we're performing crash recovery or hot standby was * In DISABLED state, we're performing crash recovery or hot standby was
* disabled in recovery.conf. * disabled in recovery.conf.
* *
* In INITIALIZED state, we haven't yet received a RUNNING_XACTS or shutdown * In INITIALIZED state, we've run InitRecoveryTransactionEnvironment, but
* checkpoint record to initialize our master transaction tracking system. * we haven't yet processed a RUNNING_XACTS or shutdown-checkpoint WAL record
* to initialize our master-transaction tracking system.
* *
* When the transaction tracking is initialized, we enter the SNAPSHOT_PENDING * When the transaction tracking is initialized, we enter the SNAPSHOT_PENDING
* state. The tracked information might still be incomplete, so we can't allow * state. The tracked information might still be incomplete, so we can't allow
...@@ -168,6 +171,7 @@ typedef enum ...@@ -168,6 +171,7 @@ typedef enum
STANDBY_SNAPSHOT_PENDING, STANDBY_SNAPSHOT_PENDING,
STANDBY_SNAPSHOT_READY STANDBY_SNAPSHOT_READY
} HotStandbyState; } HotStandbyState;
extern HotStandbyState standbyState; extern HotStandbyState standbyState;
#define InHotStandby (standbyState >= STANDBY_SNAPSHOT_PENDING) #define InHotStandby (standbyState >= STANDBY_SNAPSHOT_PENDING)
...@@ -193,7 +197,6 @@ extern int XLogArchiveTimeout; ...@@ -193,7 +197,6 @@ extern int XLogArchiveTimeout;
extern bool XLogArchiveMode; extern bool XLogArchiveMode;
extern char *XLogArchiveCommand; extern char *XLogArchiveCommand;
extern bool EnableHotStandby; extern bool EnableHotStandby;
extern int MaxStandbyDelay;
extern bool log_checkpoints; extern bool log_checkpoints;
/* WAL levels */ /* WAL levels */
...@@ -279,7 +282,7 @@ extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg); ...@@ -279,7 +282,7 @@ extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
extern bool RecoveryInProgress(void); extern bool RecoveryInProgress(void);
extern bool XLogInsertAllowed(void); extern bool XLogInsertAllowed(void);
extern TimestampTz GetLatestXLogTime(void); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern void UpdateControlFile(void); extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void); extern uint64 GetSystemIdentifier(void);
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
* *
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
* *
* $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.9 2010/06/03 22:17:32 tgl Exp $ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.10 2010/07/03 20:43:58 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -41,25 +41,35 @@ typedef enum ...@@ -41,25 +41,35 @@ typedef enum
typedef struct typedef struct
{ {
/* /*
* connection string; is used for walreceiver to connect with the primary. * PID of currently active walreceiver process, its current state and
*/ * start time (actually, the time at which it was requested to be started).
char conninfo[MAXCONNINFO];
/*
* PID of currently active walreceiver process, and the current state.
*/ */
pid_t pid; pid_t pid;
WalRcvState walRcvState; WalRcvState walRcvState;
pg_time_t startTime; pg_time_t startTime;
/* /*
* receivedUpto-1 is the last byte position that has been already * receivedUpto-1 is the last byte position that has already been
* received. When startup process starts the walreceiver, it sets this to * received. When startup process starts the walreceiver, it sets
* the point where it wants the streaming to begin. After that, * receivedUpto to the point where it wants the streaming to begin.
* walreceiver updates this whenever it flushes the received WAL. * After that, walreceiver updates this whenever it flushes the received
* WAL to disk.
*/ */
XLogRecPtr receivedUpto; XLogRecPtr receivedUpto;
/*
* latestChunkStart is the starting byte position of the current "batch"
* of received WAL. It's actually the same as the previous value of
* receivedUpto before the last flush to disk. Startup process can use
* this to detect whether it's keeping up or not.
*/
XLogRecPtr latestChunkStart;
/*
* connection string; is used for walreceiver to connect with the primary.
*/
char conninfo[MAXCONNINFO];
slock_t mutex; /* locks shared variables shown above */ slock_t mutex; /* locks shared variables shown above */
} WalRcvData; } WalRcvData;
...@@ -83,6 +93,6 @@ extern void ShutdownWalRcv(void); ...@@ -83,6 +93,6 @@ extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void); extern bool WalRcvInProgress(void);
extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished); extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished);
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo); extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(void); extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
#endif /* _WALRECEIVER_H */ #endif /* _WALRECEIVER_H */
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/storage/standby.h,v 1.10 2010/05/13 11:15:38 sriggs Exp $ * $PostgreSQL: pgsql/src/include/storage/standby.h,v 1.11 2010/07/03 20:43:58 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -19,7 +19,10 @@ ...@@ -19,7 +19,10 @@
#include "storage/procsignal.h" #include "storage/procsignal.h"
#include "storage/relfilenode.h" #include "storage/relfilenode.h"
/* User-settable GUC parameters */
extern int vacuum_defer_cleanup_age; extern int vacuum_defer_cleanup_age;
extern int max_standby_archive_delay;
extern int max_standby_streaming_delay;
extern void InitRecoveryTransactionEnvironment(void); extern void InitRecoveryTransactionEnvironment(void);
extern void ShutdownRecoveryTransactionEnvironment(void); extern void ShutdownRecoveryTransactionEnvironment(void);
...@@ -83,7 +86,7 @@ extern void standby_desc(StringInfo buf, uint8 xl_info, char *rec); ...@@ -83,7 +86,7 @@ extern void standby_desc(StringInfo buf, uint8 xl_info, char *rec);
/* /*
* Declarations for GetRunningTransactionData(). Similar to Snapshots, but * Declarations for GetRunningTransactionData(). Similar to Snapshots, but
* not quite. This has nothing at all to do with visibility on this server, * not quite. This has nothing at all to do with visibility on this server,
* so this is completely separate from snapmgr.c and snapmgr.h * so this is completely separate from snapmgr.c and snapmgr.h.
* This data is important for creating the initial snapshot state on a * This data is important for creating the initial snapshot state on a
* standby server. We need lots more information than a normal snapshot, * standby server. We need lots more information than a normal snapshot,
* hence we use a specific data structure for our needs. This data * hence we use a specific data structure for our needs. This data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment