Commit 1d257577 authored by Thomas Munro's avatar Thomas Munro

Optionally prefetch referenced data in recovery.

Introduce a new GUC recovery_prefetch, disabled by default.  When
enabled, look ahead in the WAL and try to initiate asynchronous reading
of referenced data blocks that are not yet cached in our buffer pool.
For now, this is done with posix_fadvise(), which has several caveats.
Better mechanisms will follow in later work on the I/O subsystem.

The GUC maintenance_io_concurrency is used to limit the number of
concurrent I/Os we allow ourselves to initiate, based on pessimistic
heuristics used to infer that I/Os have begun and completed.

The GUC wal_decode_buffer_size is used to limit the maximum distance we
are prepared to read ahead in the WAL to find uncached blocks.

Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com> (parts)
Reviewed-by: Andres Freund <andres@anarazel.de> (parts)
Reviewed-by: Tomas Vondra <tomas.vondra@2ndquadrant.com> (parts)
Tested-by: default avatarTomas Vondra <tomas.vondra@2ndquadrant.com>
Tested-by: default avatarJakub Wartak <Jakub.Wartak@tomtom.com>
Tested-by: default avatarDmitry Dolgov <9erthalion6@gmail.com>
Tested-by: default avatarSait Talha Nisanci <Sait.Nisanci@microsoft.com>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
parent f003d9f8
...@@ -3565,6 +3565,89 @@ include_dir 'conf.d' ...@@ -3565,6 +3565,89 @@ include_dir 'conf.d'
</variablelist> </variablelist>
</sect2> </sect2>
<sect2 id="runtime-config-wal-recovery">
<title>Recovery</title>
<indexterm>
<primary>configuration</primary>
<secondary>of recovery</secondary>
<tertiary>general settings</tertiary>
</indexterm>
<para>
This section describes the settings that apply to recovery in general,
affecting crash recovery, streaming replication and archive-based
replication.
</para>
<variablelist>
<varlistentry id="guc-recovery-prefetch" xreflabel="recovery_prefetch">
<term><varname>recovery_prefetch</varname> (<type>boolean</type>)
<indexterm>
<primary><varname>recovery_prefetch</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Whether to try to prefetch blocks that are referenced in the WAL that
are not yet in the buffer pool, during recovery. Prefetching blocks
that will soon be needed can reduce I/O wait times in some workloads.
See also the <xref linkend="guc-wal-decode-buffer-size"/> and
<xref linkend="guc-maintenance-io-concurrency"/> settings, which limit
prefetching activity.
This setting is disabled by default.
</para>
<para>
This feature currently depends on an effective
<function>posix_fadvise</function> function, which some
operating systems lack.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-recovery-prefetch-fpw" xreflabel="recovery_prefetch_fpw">
<term><varname>recovery_prefetch_fpw</varname> (<type>boolean</type>)
<indexterm>
<primary><varname>recovery_prefetch_fpw</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Whether to prefetch blocks that were logged with full page images,
during recovery. Often this doesn't help, since such blocks will not
be read the first time they are needed and might remain in the buffer
pool after that. However, on file systems with a block size larger
than
<productname>PostgreSQL</productname>'s, prefetching can avoid a
costly read-before-write when a blocks are later written.
The default is off.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-wal-decode-buffer-size" xreflabel="wal_decode_buffer_size">
<term><varname>wal_decode_buffer_size</varname> (<type>integer</type>)
<indexterm>
<primary><varname>wal_decode_buffer_size</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
A limit on how far ahead the server can look in the WAL, to find
blocks to prefetch. Setting it too high might be counterproductive,
if it means that data falls out of the
kernel cache before it is needed. If this value is specified without
units, it is taken as bytes.
The default is 512kB.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>
<sect2 id="runtime-config-wal-archive-recovery"> <sect2 id="runtime-config-wal-archive-recovery">
<title>Archive Recovery</title> <title>Archive Recovery</title>
......
...@@ -337,6 +337,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -337,6 +337,13 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
</entry> </entry>
</row> </row>
<row>
<entry><structname>pg_stat_prefetch_recovery</structname><indexterm><primary>pg_stat_prefetch_recovery</primary></indexterm></entry>
<entry>Only one row, showing statistics about blocks prefetched during recovery.
See <xref linkend="pg-stat-prefetch-recovery-view"/> for details.
</entry>
</row>
<row> <row>
<entry><structname>pg_stat_subscription</structname><indexterm><primary>pg_stat_subscription</primary></indexterm></entry> <entry><structname>pg_stat_subscription</structname><indexterm><primary>pg_stat_subscription</primary></indexterm></entry>
<entry>At least one row per subscription, showing information about <entry>At least one row per subscription, showing information about
...@@ -2917,6 +2924,78 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i ...@@ -2917,6 +2924,78 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
copy of the subscribed tables. copy of the subscribed tables.
</para> </para>
<table id="pg-stat-prefetch-recovery-view" xreflabel="pg_stat_prefetch_recovery">
<title><structname>pg_stat_prefetch_recovery</structname> View</title>
<tgroup cols="3">
<thead>
<row>
<entry>Column</entry>
<entry>Type</entry>
<entry>Description</entry>
</row>
</thead>
<tbody>
<row>
<entry><structfield>prefetch</structfield></entry>
<entry><type>bigint</type></entry>
<entry>Number of blocks prefetched because they were not in the buffer pool</entry>
</row>
<row>
<entry><structfield>skip_hit</structfield></entry>
<entry><type>bigint</type></entry>
<entry>Number of blocks not prefetched because they were already in the buffer pool</entry>
</row>
<row>
<entry><structfield>skip_new</structfield></entry>
<entry><type>bigint</type></entry>
<entry>Number of blocks not prefetched because they were new (usually relation extension)</entry>
</row>
<row>
<entry><structfield>skip_fpw</structfield></entry>
<entry><type>bigint</type></entry>
<entry>Number of blocks not prefetched because a full page image was included in the WAL and <xref linkend="guc-recovery-prefetch-fpw"/> was set to <literal>off</literal></entry>
</row>
<row>
<entry><structfield>skip_seq</structfield></entry>
<entry><type>bigint</type></entry>
<entry>Number of blocks not prefetched because of repeated access</entry>
</row>
<row>
<entry><structfield>distance</structfield></entry>
<entry><type>integer</type></entry>
<entry>How far ahead of recovery the prefetcher is currently reading, in bytes</entry>
</row>
<row>
<entry><structfield>queue_depth</structfield></entry>
<entry><type>integer</type></entry>
<entry>How many prefetches have been initiated but are not yet known to have completed</entry>
</row>
<row>
<entry><structfield>avg_distance</structfield></entry>
<entry><type>float4</type></entry>
<entry>How far ahead of recovery the prefetcher is on average, while recovery is not idle</entry>
</row>
<row>
<entry><structfield>avg_queue_depth</structfield></entry>
<entry><type>float4</type></entry>
<entry>Average number of prefetches in flight while recovery is not idle</entry>
</row>
</tbody>
</tgroup>
</table>
<para>
The <structname>pg_stat_prefetch_recovery</structname> view will contain only
one row. It is filled with nulls if recovery is not running or WAL
prefetching is not enabled. See <xref linkend="guc-recovery-prefetch"/>
for more information. The counters in this view are reset whenever the
<xref linkend="guc-recovery-prefetch"/>,
<xref linkend="guc-recovery-prefetch-fpw"/> or
<xref linkend="guc-maintenance-io-concurrency"/> setting is changed and
the server configuration is reloaded.
</para>
<table id="pg-stat-subscription" xreflabel="pg_stat_subscription"> <table id="pg-stat-subscription" xreflabel="pg_stat_subscription">
<title><structname>pg_stat_subscription</structname> View</title> <title><structname>pg_stat_subscription</structname> View</title>
<tgroup cols="1"> <tgroup cols="1">
...@@ -5049,8 +5128,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i ...@@ -5049,8 +5128,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
all the counters shown in all the counters shown in
the <structname>pg_stat_bgwriter</structname> the <structname>pg_stat_bgwriter</structname>
view, <literal>archiver</literal> to reset all the counters shown in view, <literal>archiver</literal> to reset all the counters shown in
the <structname>pg_stat_archiver</structname> view or <literal>wal</literal> the <structname>pg_stat_archiver</structname> view,
to reset all the counters shown in the <structname>pg_stat_wal</structname> view. <literal>wal</literal> to reset all the counters shown in the
<structname>pg_stat_wal</structname> view or
<literal>prefetch_recovery</literal> to reset all the counters shown
in the <structname>pg_stat_prefetch_recovery</structname> view.
</para> </para>
<para> <para>
This function is restricted to superusers by default, but other users This function is restricted to superusers by default, but other users
......
...@@ -803,6 +803,23 @@ ...@@ -803,6 +803,23 @@
counted as <literal>wal_write</literal> and <literal>wal_sync</literal> counted as <literal>wal_write</literal> and <literal>wal_sync</literal>
in <structname>pg_stat_wal</structname>, respectively. in <structname>pg_stat_wal</structname>, respectively.
</para> </para>
<para>
The <xref linkend="guc-recovery-prefetch"/> parameter can
be used to improve I/O performance during recovery by instructing
<productname>PostgreSQL</productname> to initiate reads
of disk blocks that will soon be needed but are not currently in
<productname>PostgreSQL</productname>'s buffer pool.
The <xref linkend="guc-maintenance-io-concurrency"/> and
<xref linkend="guc-wal-decode-buffer-size"/> settings limit prefetching
concurrency and distance, respectively. The
prefetching mechanism is most likely to be effective on systems
with <varname>full_page_writes</varname> set to
<varname>off</varname> (where that is safe), and where the working
set is larger than RAM. By default, prefetching in recovery is enabled
on operating systems that have <function>posix_fadvise</function>
support.
</para>
</sect1> </sect1>
<sect1 id="wal-internals"> <sect1 id="wal-internals">
......
...@@ -31,6 +31,7 @@ OBJS = \ ...@@ -31,6 +31,7 @@ OBJS = \
xlogarchive.o \ xlogarchive.o \
xlogfuncs.o \ xlogfuncs.o \
xloginsert.o \ xloginsert.o \
xlogprefetch.o \
xlogreader.o \ xlogreader.o \
xlogutils.o xlogutils.o
......
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
#include "access/xlog_internal.h" #include "access/xlog_internal.h"
#include "access/xlogarchive.h" #include "access/xlogarchive.h"
#include "access/xloginsert.h" #include "access/xloginsert.h"
#include "access/xlogprefetch.h"
#include "access/xlogreader.h" #include "access/xlogreader.h"
#include "access/xlogutils.h" #include "access/xlogutils.h"
#include "catalog/catversion.h" #include "catalog/catversion.h"
...@@ -110,6 +111,7 @@ int CommitDelay = 0; /* precommit delay in microseconds */ ...@@ -110,6 +111,7 @@ int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
int wal_retrieve_retry_interval = 5000; int wal_retrieve_retry_interval = 5000;
int max_slot_wal_keep_size_mb = -1; int max_slot_wal_keep_size_mb = -1;
int wal_decode_buffer_size = 512 * 1024;
bool track_wal_io_timing = false; bool track_wal_io_timing = false;
#ifdef WAL_DEBUG #ifdef WAL_DEBUG
...@@ -910,7 +912,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, ...@@ -910,7 +912,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
XLogSource source, bool notfoundOk); XLogSource source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
static bool XLogPageRead(XLogReaderState *state, static bool XLogPageRead(XLogReaderState *state,
bool fetching_ckpt, int emode, bool randAccess); bool fetching_ckpt, int emode, bool randAccess,
bool nowait);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt, bool fetching_ckpt,
XLogRecPtr tliRecPtr, XLogRecPtr tliRecPtr,
...@@ -1461,7 +1464,7 @@ checkXLogConsistency(XLogReaderState *record) ...@@ -1461,7 +1464,7 @@ checkXLogConsistency(XLogReaderState *record)
* temporary page. * temporary page.
*/ */
buf = XLogReadBufferExtended(rnode, forknum, blkno, buf = XLogReadBufferExtended(rnode, forknum, blkno,
RBM_NORMAL_NO_LOG); RBM_NORMAL_NO_LOG, InvalidBuffer);
if (!BufferIsValid(buf)) if (!BufferIsValid(buf))
continue; continue;
...@@ -3729,7 +3732,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, ...@@ -3729,7 +3732,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
xlogfname); xlogfname);
set_ps_display(activitymsg); set_ps_display(activitymsg);
restoredFromArchive = RestoreArchivedFile(path, xlogfname, restoredFromArchive = RestoreArchivedFile(path, xlogfname,
"RECOVERYXLOG", "RECOVERYXLOG",
wal_segment_size, wal_segment_size,
...@@ -4389,9 +4391,9 @@ ReadRecord(XLogReaderState *xlogreader, int emode, ...@@ -4389,9 +4391,9 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
while ((result = XLogReadRecord(xlogreader, &record, &errormsg)) while ((result = XLogReadRecord(xlogreader, &record, &errormsg))
== XLREAD_NEED_DATA) == XLREAD_NEED_DATA)
{ {
if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess)) if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess,
false /* wait for data if streaming */))
break; break;
} }
ReadRecPtr = xlogreader->ReadRecPtr; ReadRecPtr = xlogreader->ReadRecPtr;
...@@ -6632,6 +6634,12 @@ StartupXLOG(void) ...@@ -6632,6 +6634,12 @@ StartupXLOG(void)
errdetail("Failed while allocating a WAL reading processor."))); errdetail("Failed while allocating a WAL reading processor.")));
xlogreader->system_identifier = ControlFile->system_identifier; xlogreader->system_identifier = ControlFile->system_identifier;
/*
* Set the WAL decode buffer size. This limits how far ahead we can read
* in the WAL.
*/
XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
/* /*
* Allocate two page buffers dedicated to WAL consistency checks. We do * Allocate two page buffers dedicated to WAL consistency checks. We do
* it this way, rather than just making static arrays, for two reasons: * it this way, rather than just making static arrays, for two reasons:
...@@ -7312,6 +7320,7 @@ StartupXLOG(void) ...@@ -7312,6 +7320,7 @@ StartupXLOG(void)
{ {
ErrorContextCallback errcallback; ErrorContextCallback errcallback;
TimestampTz xtime; TimestampTz xtime;
XLogPrefetchState prefetch;
PGRUsage ru0; PGRUsage ru0;
pg_rusage_init(&ru0); pg_rusage_init(&ru0);
...@@ -7322,6 +7331,9 @@ StartupXLOG(void) ...@@ -7322,6 +7331,9 @@ StartupXLOG(void)
(errmsg("redo starts at %X/%X", (errmsg("redo starts at %X/%X",
LSN_FORMAT_ARGS(ReadRecPtr)))); LSN_FORMAT_ARGS(ReadRecPtr))));
/* Prepare to prefetch, if configured. */
XLogPrefetchBegin(&prefetch, xlogreader);
/* /*
* main redo apply loop * main redo apply loop
*/ */
...@@ -7351,6 +7363,14 @@ StartupXLOG(void) ...@@ -7351,6 +7363,14 @@ StartupXLOG(void)
/* Handle interrupt signals of startup process */ /* Handle interrupt signals of startup process */
HandleStartupProcInterrupts(); HandleStartupProcInterrupts();
/* Perform WAL prefetching, if enabled. */
while (XLogPrefetch(&prefetch, xlogreader->ReadRecPtr) == XLREAD_NEED_DATA)
{
if (!XLogPageRead(xlogreader, false, LOG, false,
true /* don't wait for streaming data */))
break;
}
/* /*
* Pause WAL replay, if requested by a hot-standby session via * Pause WAL replay, if requested by a hot-standby session via
* SetRecoveryPause(). * SetRecoveryPause().
...@@ -7524,6 +7544,9 @@ StartupXLOG(void) ...@@ -7524,6 +7544,9 @@ StartupXLOG(void)
*/ */
if (AllowCascadeReplication()) if (AllowCascadeReplication())
WalSndWakeup(); WalSndWakeup();
/* Reset the prefetcher. */
XLogPrefetchReconfigure();
} }
/* Exit loop if we reached inclusive recovery target */ /* Exit loop if we reached inclusive recovery target */
...@@ -7540,6 +7563,7 @@ StartupXLOG(void) ...@@ -7540,6 +7563,7 @@ StartupXLOG(void)
/* /*
* end of main redo apply loop * end of main redo apply loop
*/ */
XLogPrefetchEnd(&prefetch);
if (reachedRecoveryTarget) if (reachedRecoveryTarget)
{ {
...@@ -12109,10 +12133,13 @@ CancelBackup(void) ...@@ -12109,10 +12133,13 @@ CancelBackup(void)
* and call XLogPageRead() again with the same arguments. This lets * and call XLogPageRead() again with the same arguments. This lets
* XLogPageRead() to try fetching the record from another source, or to * XLogPageRead() to try fetching the record from another source, or to
* sleep and retry. * sleep and retry.
*
* If nowait is true, then return false immediately if the requested data isn't
* available yet.
*/ */
static bool static bool
XLogPageRead(XLogReaderState *state, XLogPageRead(XLogReaderState *state,
bool fetching_ckpt, int emode, bool randAccess) bool fetching_ckpt, int emode, bool randAccess, bool nowait)
{ {
char *readBuf = state->readBuf; char *readBuf = state->readBuf;
XLogRecPtr targetPagePtr = state->readPagePtr; XLogRecPtr targetPagePtr = state->readPagePtr;
...@@ -12136,9 +12163,6 @@ XLogPageRead(XLogReaderState *state, ...@@ -12136,9 +12163,6 @@ XLogPageRead(XLogReaderState *state,
/* /*
* Request a restartpoint if we've replayed too much xlog since the * Request a restartpoint if we've replayed too much xlog since the
* last one. * last one.
*
* XXX Why is this here? Move it to recovery loop, since it's based
* on replay position, not read position?
*/ */
if (bgwriterLaunched) if (bgwriterLaunched)
{ {
...@@ -12163,6 +12187,12 @@ retry: ...@@ -12163,6 +12187,12 @@ retry:
(readSource == XLOG_FROM_STREAM && (readSource == XLOG_FROM_STREAM &&
flushedUpto < targetPagePtr + reqLen)) flushedUpto < targetPagePtr + reqLen))
{ {
if (nowait)
{
XLogReaderSetInputData(state, -1);
return false;
}
if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
randAccess, fetching_ckpt, randAccess, fetching_ckpt,
targetRecPtr, state->seg.ws_segno)) targetRecPtr, state->seg.ws_segno))
...@@ -12396,6 +12426,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -12396,6 +12426,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
*/ */
currentSource = XLOG_FROM_STREAM; currentSource = XLOG_FROM_STREAM;
startWalReceiver = true; startWalReceiver = true;
XLogPrefetchReconfigure();
break; break;
case XLOG_FROM_STREAM: case XLOG_FROM_STREAM:
...@@ -12651,6 +12682,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, ...@@ -12651,6 +12682,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
else else
havedata = false; havedata = false;
} }
if (havedata) if (havedata)
{ {
/* /*
......
This diff is collapsed.
...@@ -1927,6 +1927,8 @@ DecodeXLogRecord(XLogReaderState *state, ...@@ -1927,6 +1927,8 @@ DecodeXLogRecord(XLogReaderState *state,
blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0); blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0); blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
blk->recent_buffer = InvalidBuffer;
COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16)); COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
/* cross-check that the HAS_DATA flag is set iff data_length > 0 */ /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
if (blk->has_data && blk->data_len == 0) if (blk->has_data && blk->data_len == 0)
...@@ -2134,6 +2136,15 @@ err: ...@@ -2134,6 +2136,15 @@ err:
bool bool
XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum) RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
{
return XLogRecGetRecentBuffer(record, block_id, rnode, forknum, blknum,
NULL);
}
bool
XLogRecGetRecentBuffer(XLogReaderState *record, uint8 block_id,
RelFileNode *rnode, ForkNumber *forknum,
BlockNumber *blknum, Buffer *recent_buffer)
{ {
DecodedBkpBlock *bkpb; DecodedBkpBlock *bkpb;
...@@ -2148,6 +2159,8 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, ...@@ -2148,6 +2159,8 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
*forknum = bkpb->forknum; *forknum = bkpb->forknum;
if (blknum) if (blknum)
*blknum = bkpb->blkno; *blknum = bkpb->blkno;
if (recent_buffer)
*recent_buffer = bkpb->recent_buffer;
return true; return true;
} }
......
...@@ -335,11 +335,13 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, ...@@ -335,11 +335,13 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
RelFileNode rnode; RelFileNode rnode;
ForkNumber forknum; ForkNumber forknum;
BlockNumber blkno; BlockNumber blkno;
Buffer recent_buffer;
Page page; Page page;
bool zeromode; bool zeromode;
bool willinit; bool willinit;
if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno)) if (!XLogRecGetRecentBuffer(record, block_id, &rnode, &forknum, &blkno,
&recent_buffer))
{ {
/* Caller specified a bogus block_id */ /* Caller specified a bogus block_id */
elog(PANIC, "failed to locate backup block with ID %d", block_id); elog(PANIC, "failed to locate backup block with ID %d", block_id);
...@@ -361,7 +363,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, ...@@ -361,7 +363,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
{ {
Assert(XLogRecHasBlockImage(record, block_id)); Assert(XLogRecHasBlockImage(record, block_id));
*buf = XLogReadBufferExtended(rnode, forknum, blkno, *buf = XLogReadBufferExtended(rnode, forknum, blkno,
get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK); get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK,
recent_buffer);
page = BufferGetPage(*buf); page = BufferGetPage(*buf);
if (!RestoreBlockImage(record, block_id, page)) if (!RestoreBlockImage(record, block_id, page))
elog(ERROR, "failed to restore block image"); elog(ERROR, "failed to restore block image");
...@@ -390,7 +393,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, ...@@ -390,7 +393,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
} }
else else
{ {
*buf = XLogReadBufferExtended(rnode, forknum, blkno, mode); *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode,
recent_buffer);
if (BufferIsValid(*buf)) if (BufferIsValid(*buf))
{ {
if (mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK) if (mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK)
...@@ -437,7 +441,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, ...@@ -437,7 +441,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
*/ */
Buffer Buffer
XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
BlockNumber blkno, ReadBufferMode mode) BlockNumber blkno, ReadBufferMode mode,
Buffer recent_buffer)
{ {
BlockNumber lastblock; BlockNumber lastblock;
Buffer buffer; Buffer buffer;
...@@ -445,6 +450,15 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, ...@@ -445,6 +450,15 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
Assert(blkno != P_NEW); Assert(blkno != P_NEW);
/* Do we have a clue where the buffer might be already? */
if (BufferIsValid(recent_buffer) &&
mode == RBM_NORMAL &&
ReadRecentBuffer(rnode, forknum, blkno, recent_buffer))
{
buffer = recent_buffer;
goto recent_buffer_fast_path;
}
/* Open the relation at smgr level */ /* Open the relation at smgr level */
smgr = smgropen(rnode, InvalidBackendId); smgr = smgropen(rnode, InvalidBackendId);
...@@ -503,6 +517,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, ...@@ -503,6 +517,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
} }
} }
recent_buffer_fast_path:
if (mode == RBM_NORMAL) if (mode == RBM_NORMAL)
{ {
/* check that page has been initialized */ /* check that page has been initialized */
......
...@@ -911,6 +911,20 @@ CREATE VIEW pg_stat_wal_receiver AS ...@@ -911,6 +911,20 @@ CREATE VIEW pg_stat_wal_receiver AS
FROM pg_stat_get_wal_receiver() s FROM pg_stat_get_wal_receiver() s
WHERE s.pid IS NOT NULL; WHERE s.pid IS NOT NULL;
CREATE VIEW pg_stat_prefetch_recovery AS
SELECT
s.stats_reset,
s.prefetch,
s.skip_hit,
s.skip_new,
s.skip_fpw,
s.skip_seq,
s.distance,
s.queue_depth,
s.avg_distance,
s.avg_queue_depth
FROM pg_stat_get_prefetch_recovery() s;
CREATE VIEW pg_stat_subscription AS CREATE VIEW pg_stat_subscription AS
SELECT SELECT
su.oid AS subid, su.oid AS subid,
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#include "access/transam.h" #include "access/transam.h"
#include "access/twophase_rmgr.h" #include "access/twophase_rmgr.h"
#include "access/xact.h" #include "access/xact.h"
#include "access/xlogprefetch.h"
#include "catalog/partition.h" #include "catalog/partition.h"
#include "catalog/pg_database.h" #include "catalog/pg_database.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
...@@ -278,6 +279,7 @@ static PgStat_WalStats walStats; ...@@ -278,6 +279,7 @@ static PgStat_WalStats walStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
static PgStat_ReplSlotStats *replSlotStats; static PgStat_ReplSlotStats *replSlotStats;
static int nReplSlotStats; static int nReplSlotStats;
static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
/* /*
* List of OIDs of databases we need to write out. If an entry is InvalidOid, * List of OIDs of databases we need to write out. If an entry is InvalidOid,
...@@ -349,6 +351,7 @@ static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len); ...@@ -349,6 +351,7 @@ static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len); static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
static void pgstat_recv_wal(PgStat_MsgWal *msg, int len); static void pgstat_recv_wal(PgStat_MsgWal *msg, int len);
static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len); static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len);
static void pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len);
static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len); static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len); static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len); static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
...@@ -1424,11 +1427,20 @@ pgstat_reset_shared_counters(const char *target) ...@@ -1424,11 +1427,20 @@ pgstat_reset_shared_counters(const char *target)
msg.m_resettarget = RESET_BGWRITER; msg.m_resettarget = RESET_BGWRITER;
else if (strcmp(target, "wal") == 0) else if (strcmp(target, "wal") == 0)
msg.m_resettarget = RESET_WAL; msg.m_resettarget = RESET_WAL;
else if (strcmp(target, "prefetch_recovery") == 0)
{
/*
* We can't ask the stats collector to do this for us as it is not
* attached to shared memory.
*/
XLogPrefetchRequestResetStats();
return;
}
else else
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized reset target: \"%s\"", target), errmsg("unrecognized reset target: \"%s\"", target),
errhint("Target must be \"archiver\", \"bgwriter\" or \"wal\"."))); errhint("Target must be \"archiver\", \"bgwriter\", \"wal\" or \"prefetch_recovery\".")));
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER); pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
pgstat_send(&msg, sizeof(msg)); pgstat_send(&msg, sizeof(msg));
...@@ -2873,6 +2885,22 @@ pgstat_fetch_replslot(int *nslots_p) ...@@ -2873,6 +2885,22 @@ pgstat_fetch_replslot(int *nslots_p)
return replSlotStats; return replSlotStats;
} }
/*
* ---------
* pgstat_fetch_recoveryprefetch() -
*
* Support function for restoring the counters managed by xlogprefetch.c.
* ---------
*/
PgStat_RecoveryPrefetchStats *
pgstat_fetch_recoveryprefetch(void)
{
backend_read_statsfile();
return &recoveryPrefetchStats;
}
/* /*
* Shut down a single backend's statistics reporting at process exit. * Shut down a single backend's statistics reporting at process exit.
* *
...@@ -3148,6 +3176,23 @@ pgstat_send_slru(void) ...@@ -3148,6 +3176,23 @@ pgstat_send_slru(void)
} }
/* ----------
* pgstat_send_recoveryprefetch() -
*
* Send recovery prefetch statistics to the collector
* ----------
*/
void
pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats)
{
PgStat_MsgRecoveryPrefetch msg;
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYPREFETCH);
msg.m_stats = *stats;
pgstat_send(&msg, sizeof(msg));
}
/* ---------- /* ----------
* PgstatCollectorMain() - * PgstatCollectorMain() -
* *
...@@ -3365,6 +3410,10 @@ PgstatCollectorMain(int argc, char *argv[]) ...@@ -3365,6 +3410,10 @@ PgstatCollectorMain(int argc, char *argv[])
pgstat_recv_slru(&msg.msg_slru, len); pgstat_recv_slru(&msg.msg_slru, len);
break; break;
case PGSTAT_MTYPE_RECOVERYPREFETCH:
pgstat_recv_recoveryprefetch(&msg.msg_recoveryprefetch, len);
break;
case PGSTAT_MTYPE_FUNCSTAT: case PGSTAT_MTYPE_FUNCSTAT:
pgstat_recv_funcstat(&msg.msg_funcstat, len); pgstat_recv_funcstat(&msg.msg_funcstat, len);
break; break;
...@@ -3658,6 +3707,13 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) ...@@ -3658,6 +3707,13 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
rc = fwrite(slruStats, sizeof(slruStats), 1, fpout); rc = fwrite(slruStats, sizeof(slruStats), 1, fpout);
(void) rc; /* we'll check for error with ferror */ (void) rc; /* we'll check for error with ferror */
/*
* Write recovery prefetch stats struct
*/
rc = fwrite(&recoveryPrefetchStats, sizeof(recoveryPrefetchStats), 1,
fpout);
(void) rc; /* we'll check for error with ferror */
/* /*
* Walk through the database table. * Walk through the database table.
*/ */
...@@ -3933,6 +3989,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) ...@@ -3933,6 +3989,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
memset(&archiverStats, 0, sizeof(archiverStats)); memset(&archiverStats, 0, sizeof(archiverStats));
memset(&walStats, 0, sizeof(walStats)); memset(&walStats, 0, sizeof(walStats));
memset(&slruStats, 0, sizeof(slruStats)); memset(&slruStats, 0, sizeof(slruStats));
memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
/* /*
* Set the current timestamp (will be kept only in case we can't load an * Set the current timestamp (will be kept only in case we can't load an
...@@ -4038,6 +4095,18 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) ...@@ -4038,6 +4095,18 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
goto done; goto done;
} }
/*
* Read recoveryPrefetchStats struct
*/
if (fread(&recoveryPrefetchStats, 1, sizeof(recoveryPrefetchStats),
fpin) != sizeof(recoveryPrefetchStats))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"", statfile)));
memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
goto done;
}
/* /*
* We found an existing collector stats file. Read it and put all the * We found an existing collector stats file. Read it and put all the
* hashtable entries into place. * hashtable entries into place.
...@@ -4356,6 +4425,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, ...@@ -4356,6 +4425,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_WalStats myWalStats; PgStat_WalStats myWalStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
PgStat_ReplSlotStats myReplSlotStats; PgStat_ReplSlotStats myReplSlotStats;
PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
FILE *fpin; FILE *fpin;
int32 format_id; int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
...@@ -4432,6 +4502,18 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, ...@@ -4432,6 +4502,18 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
return false; return false;
} }
/*
* Read recovery prefetch stats struct
*/
if (fread(&myRecoveryPrefetchStats, 1, sizeof(myRecoveryPrefetchStats),
fpin) != sizeof(myRecoveryPrefetchStats))
{
ereport(pgStatRunningInCollector ? LOG : WARNING,
(errmsg("corrupted statistics file \"%s\"", statfile)));
FreeFile(fpin);
return false;
}
/* By default, we're going to return the timestamp of the global file. */ /* By default, we're going to return the timestamp of the global file. */
*ts = myGlobalStats.stats_timestamp; *ts = myGlobalStats.stats_timestamp;
...@@ -4615,6 +4697,13 @@ backend_read_statsfile(void) ...@@ -4615,6 +4697,13 @@ backend_read_statsfile(void)
if (ok && file_ts >= min_ts) if (ok && file_ts >= min_ts)
break; break;
/*
* If we're in crash recovery, the collector may not even be running,
* so work with what we have.
*/
if (InRecovery)
break;
/* Not there or too old, so kick the collector and wait a bit */ /* Not there or too old, so kick the collector and wait a bit */
if ((count % PGSTAT_INQ_LOOP_COUNT) == 0) if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
pgstat_send_inquiry(cur_ts, min_ts, inquiry_db); pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
...@@ -5349,6 +5438,18 @@ pgstat_recv_slru(PgStat_MsgSLRU *msg, int len) ...@@ -5349,6 +5438,18 @@ pgstat_recv_slru(PgStat_MsgSLRU *msg, int len)
slruStats[msg->m_index].truncate += msg->m_truncate; slruStats[msg->m_index].truncate += msg->m_truncate;
} }
/* ----------
* pgstat_recv_recoveryprefetch() -
*
* Process a recovery prefetch message.
* ----------
*/
static void
pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len)
{
recoveryPrefetchStats = msg->m_stats;
}
/* ---------- /* ----------
* pgstat_recv_recoveryconflict() - * pgstat_recv_recoveryconflict() -
* *
......
...@@ -210,7 +210,8 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk, ...@@ -210,7 +210,8 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
blkno = fsm_logical_to_physical(addr); blkno = fsm_logical_to_physical(addr);
/* If the page doesn't exist already, extend */ /* If the page doesn't exist already, extend */
buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR); buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR,
InvalidBuffer);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf); page = BufferGetPage(buf);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "access/subtrans.h" #include "access/subtrans.h"
#include "access/syncscan.h" #include "access/syncscan.h"
#include "access/twophase.h" #include "access/twophase.h"
#include "access/xlogprefetch.h"
#include "commands/async.h" #include "commands/async.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
...@@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(void) ...@@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, PredicateLockShmemSize()); size = add_size(size, PredicateLockShmemSize());
size = add_size(size, ProcGlobalShmemSize()); size = add_size(size, ProcGlobalShmemSize());
size = add_size(size, XLOGShmemSize()); size = add_size(size, XLOGShmemSize());
size = add_size(size, XLogPrefetchShmemSize());
size = add_size(size, CLOGShmemSize()); size = add_size(size, CLOGShmemSize());
size = add_size(size, CommitTsShmemSize()); size = add_size(size, CommitTsShmemSize());
size = add_size(size, SUBTRANSShmemSize()); size = add_size(size, SUBTRANSShmemSize());
...@@ -217,6 +219,7 @@ CreateSharedMemoryAndSemaphores(void) ...@@ -217,6 +219,7 @@ CreateSharedMemoryAndSemaphores(void)
* Set up xlog, clog, and buffers * Set up xlog, clog, and buffers
*/ */
XLOGShmemInit(); XLOGShmemInit();
XLogPrefetchShmemInit();
CLOGShmemInit(); CLOGShmemInit();
CommitTsShmemInit(); CommitTsShmemInit();
SUBTRANSShmemInit(); SUBTRANSShmemInit();
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "access/twophase.h" #include "access/twophase.h"
#include "access/xact.h" #include "access/xact.h"
#include "access/xlog_internal.h" #include "access/xlog_internal.h"
#include "access/xlogprefetch.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_authid.h" #include "catalog/pg_authid.h"
#include "catalog/storage.h" #include "catalog/storage.h"
...@@ -209,6 +210,7 @@ static bool check_effective_io_concurrency(int *newval, void **extra, GucSource ...@@ -209,6 +210,7 @@ static bool check_effective_io_concurrency(int *newval, void **extra, GucSource
static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source); static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source);
static bool check_huge_page_size(int *newval, void **extra, GucSource source); static bool check_huge_page_size(int *newval, void **extra, GucSource source);
static bool check_client_connection_check_interval(int *newval, void **extra, GucSource source); static bool check_client_connection_check_interval(int *newval, void **extra, GucSource source);
static void assign_maintenance_io_concurrency(int newval, void *extra);
static void assign_pgstat_temp_directory(const char *newval, void *extra); static void assign_pgstat_temp_directory(const char *newval, void *extra);
static bool check_application_name(char **newval, void **extra, GucSource source); static bool check_application_name(char **newval, void **extra, GucSource source);
static void assign_application_name(const char *newval, void *extra); static void assign_application_name(const char *newval, void *extra);
...@@ -1294,6 +1296,27 @@ static struct config_bool ConfigureNamesBool[] = ...@@ -1294,6 +1296,27 @@ static struct config_bool ConfigureNamesBool[] =
true, true,
NULL, NULL, NULL NULL, NULL, NULL
}, },
{
{"recovery_prefetch", PGC_SIGHUP, WAL_SETTINGS,
gettext_noop("Prefetch referenced blocks during recovery"),
gettext_noop("Read ahead of the current replay position to find uncached blocks.")
},
&recovery_prefetch,
false,
NULL, assign_recovery_prefetch, NULL
},
{
{"recovery_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS,
gettext_noop("Prefetch blocks that have full page images in the WAL"),
gettext_noop("On some systems, there is no benefit to prefetching pages that will be "
"entirely overwritten, but if the logical page size of the filesystem is "
"larger than PostgreSQL's, this can be beneficial. This option has no "
"effect unless recovery_prefetch is enabled.")
},
&recovery_prefetch_fpw,
false,
NULL, assign_recovery_prefetch_fpw, NULL
},
{ {
{"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS, {"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS,
...@@ -2748,6 +2771,17 @@ static struct config_int ConfigureNamesInt[] = ...@@ -2748,6 +2771,17 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL NULL, NULL, NULL
}, },
{
{"wal_decode_buffer_size", PGC_POSTMASTER, WAL_ARCHIVE_RECOVERY,
gettext_noop("Maximum buffer size for reading ahead in the WAL during recovery."),
gettext_noop("This controls the maximum distance we can read ahead n the WAL to prefetch referenced blocks."),
GUC_UNIT_BYTE
},
&wal_decode_buffer_size,
512 * 1024, 64 * 1024, INT_MAX,
NULL, NULL, NULL
},
{ {
{"wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING, {"wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
gettext_noop("Sets the size of WAL files held for standby servers."), gettext_noop("Sets the size of WAL files held for standby servers."),
...@@ -3068,7 +3102,8 @@ static struct config_int ConfigureNamesInt[] = ...@@ -3068,7 +3102,8 @@ static struct config_int ConfigureNamesInt[] =
0, 0,
#endif #endif
0, MAX_IO_CONCURRENCY, 0, MAX_IO_CONCURRENCY,
check_maintenance_io_concurrency, NULL, NULL check_maintenance_io_concurrency, assign_maintenance_io_concurrency,
NULL
}, },
{ {
...@@ -12072,6 +12107,20 @@ check_client_connection_check_interval(int *newval, void **extra, GucSource sour ...@@ -12072,6 +12107,20 @@ check_client_connection_check_interval(int *newval, void **extra, GucSource sour
return true; return true;
} }
static void
assign_maintenance_io_concurrency(int newval, void *extra)
{
#ifdef USE_PREFETCH
/*
* Reconfigure recovery prefetching, because a setting it depends on
* changed.
*/
maintenance_io_concurrency = newval;
if (AmStartupProcess())
XLogPrefetchReconfigure();
#endif
}
static void static void
assign_pgstat_temp_directory(const char *newval, void *extra) assign_pgstat_temp_directory(const char *newval, void *extra)
{ {
......
...@@ -235,6 +235,12 @@ ...@@ -235,6 +235,12 @@
#checkpoint_flush_after = 0 # measured in pages, 0 disables #checkpoint_flush_after = 0 # measured in pages, 0 disables
#checkpoint_warning = 30s # 0 disables #checkpoint_warning = 30s # 0 disables
# - Prefetching during recovery -
#wal_decode_buffer_size = 512kB # lookahead window used for prefetching
#recovery_prefetch = off # prefetch pages referenced in the WAL?
#recovery_prefetch_fpw = off # even pages logged with full page?
# - Archiving - # - Archiving -
#archive_mode = off # enables archiving; off, on, or always #archive_mode = off # enables archiving; off, on, or always
......
...@@ -132,6 +132,7 @@ extern char *PrimaryConnInfo; ...@@ -132,6 +132,7 @@ extern char *PrimaryConnInfo;
extern char *PrimarySlotName; extern char *PrimarySlotName;
extern bool wal_receiver_create_temp_slot; extern bool wal_receiver_create_temp_slot;
extern bool track_wal_io_timing; extern bool track_wal_io_timing;
extern int wal_decode_buffer_size;
/* indirectly set via GUC system */ /* indirectly set via GUC system */
extern TransactionId recoveryTargetXid; extern TransactionId recoveryTargetXid;
......
/*-------------------------------------------------------------------------
*
* xlogprefetch.h
* Declarations for the recovery prefetching module.
*
* Portions Copyright (c) 2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/include/access/xlogprefetch.h
*-------------------------------------------------------------------------
*/
#ifndef XLOGPREFETCH_H
#define XLOGPREFETCH_H
#include "access/xlogdefs.h"
/* GUCs */
extern bool recovery_prefetch;
extern bool recovery_prefetch_fpw;
struct XLogPrefetcher;
typedef struct XLogPrefetcher XLogPrefetcher;
extern int XLogPrefetchReconfigureCount;
typedef struct XLogPrefetchState
{
XLogReaderState *reader;
XLogPrefetcher *prefetcher;
int reconfigure_count;
} XLogPrefetchState;
extern size_t XLogPrefetchShmemSize(void);
extern void XLogPrefetchShmemInit(void);
extern void XLogPrefetchReconfigure(void);
extern void XLogPrefetchRequestResetStats(void);
extern void XLogPrefetchBegin(XLogPrefetchState *state, XLogReaderState *reader);
extern void XLogPrefetchEnd(XLogPrefetchState *state);
/* Functions exposed only for the use of XLogPrefetch(). */
extern XLogPrefetcher *XLogPrefetcherAllocate(XLogReaderState *reader);
extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher);
extern bool XLogPrefetcherReadAhead(XLogPrefetcher *prefetch,
XLogRecPtr replaying_lsn);
/*
* Tell the prefetching module that we are now replaying a given LSN, so that
* it can decide how far ahead to read in the WAL, if configured. Return
* true if more data is needed by the reader.
*/
static inline bool
XLogPrefetch(XLogPrefetchState *state, XLogRecPtr replaying_lsn)
{
/*
* Handle any configuration changes. Rather than trying to deal with
* various parameter changes, we just tear down and set up a new
* prefetcher if anything we depend on changes.
*/
if (unlikely(state->reconfigure_count != XLogPrefetchReconfigureCount))
{
/* If we had a prefetcher, tear it down. */
if (state->prefetcher)
{
XLogPrefetcherFree(state->prefetcher);
state->prefetcher = NULL;
}
/* If we want a prefetcher, set it up. */
if (recovery_prefetch)
state->prefetcher = XLogPrefetcherAllocate(state->reader);
state->reconfigure_count = XLogPrefetchReconfigureCount;
}
if (state->prefetcher)
return XLogPrefetcherReadAhead(state->prefetcher, replaying_lsn);
return false;
}
#endif
...@@ -39,6 +39,7 @@ ...@@ -39,6 +39,7 @@
#endif #endif
#include "access/xlogrecord.h" #include "access/xlogrecord.h"
#include "storage/buf.h"
/* WALOpenSegment represents a WAL segment being read. */ /* WALOpenSegment represents a WAL segment being read. */
typedef struct WALOpenSegment typedef struct WALOpenSegment
...@@ -77,6 +78,9 @@ typedef struct ...@@ -77,6 +78,9 @@ typedef struct
ForkNumber forknum; ForkNumber forknum;
BlockNumber blkno; BlockNumber blkno;
/* Workspace for remembering last known buffer holding this block. */
Buffer recent_buffer;
/* copy of the fork_flags field from the XLogRecordBlockHeader */ /* copy of the fork_flags field from the XLogRecordBlockHeader */
uint8 flags; uint8 flags;
...@@ -397,5 +401,8 @@ extern char *XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size * ...@@ -397,5 +401,8 @@ extern char *XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *
extern bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, extern bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
RelFileNode *rnode, ForkNumber *forknum, RelFileNode *rnode, ForkNumber *forknum,
BlockNumber *blknum); BlockNumber *blknum);
extern bool XLogRecGetRecentBuffer(XLogReaderState *record, uint8 block_id,
RelFileNode *rnode, ForkNumber *forknum,
BlockNumber *blknum, Buffer *recent_buffer);
#endif /* XLOGREADER_H */ #endif /* XLOGREADER_H */
...@@ -42,7 +42,8 @@ extern XLogRedoAction XLogReadBufferForRedoExtended(XLogReaderState *record, ...@@ -42,7 +42,8 @@ extern XLogRedoAction XLogReadBufferForRedoExtended(XLogReaderState *record,
Buffer *buf); Buffer *buf);
extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
BlockNumber blkno, ReadBufferMode mode); BlockNumber blkno, ReadBufferMode mode,
Buffer recent_buffer);
extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
extern void FreeFakeRelcacheEntry(Relation fakerel); extern void FreeFakeRelcacheEntry(Relation fakerel);
......
...@@ -6291,6 +6291,14 @@ ...@@ -6291,6 +6291,14 @@
prorettype => 'text', proargtypes => '', prorettype => 'text', proargtypes => '',
prosrc => 'pg_get_wal_replay_pause_state' }, prosrc => 'pg_get_wal_replay_pause_state' },
{ oid => '9085', descr => 'statistics: information about WAL prefetching',
proname => 'pg_stat_get_prefetch_recovery', prorows => '1', provolatile => 'v',
proretset => 't', prorettype => 'record', proargtypes => '',
proallargtypes => '{timestamptz,int8,int8,int8,int8,int8,int4,int4,float4,float4}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
proargnames => '{stats_reset,prefetch,skip_hit,skip_new,skip_fpw,skip_seq,distance,queue_depth,avg_distance,avg_queue_depth}',
prosrc => 'pg_stat_get_prefetch_recovery' },
{ oid => '2621', descr => 'reload configuration files', { oid => '2621', descr => 'reload configuration files',
proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool', proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool',
proargtypes => '', prosrc => 'pg_reload_conf' }, proargtypes => '', prosrc => 'pg_reload_conf' },
......
...@@ -74,6 +74,7 @@ typedef enum StatMsgType ...@@ -74,6 +74,7 @@ typedef enum StatMsgType
PGSTAT_MTYPE_BGWRITER, PGSTAT_MTYPE_BGWRITER,
PGSTAT_MTYPE_WAL, PGSTAT_MTYPE_WAL,
PGSTAT_MTYPE_SLRU, PGSTAT_MTYPE_SLRU,
PGSTAT_MTYPE_RECOVERYPREFETCH,
PGSTAT_MTYPE_FUNCSTAT, PGSTAT_MTYPE_FUNCSTAT,
PGSTAT_MTYPE_FUNCPURGE, PGSTAT_MTYPE_FUNCPURGE,
PGSTAT_MTYPE_RECOVERYCONFLICT, PGSTAT_MTYPE_RECOVERYCONFLICT,
...@@ -197,6 +198,19 @@ typedef struct PgStat_TableXactStatus ...@@ -197,6 +198,19 @@ typedef struct PgStat_TableXactStatus
struct PgStat_TableXactStatus *next; /* next of same subxact */ struct PgStat_TableXactStatus *next; /* next of same subxact */
} PgStat_TableXactStatus; } PgStat_TableXactStatus;
/*
* Recovery prefetching statistics persisted on disk by pgstat.c, but kept in
* shared memory by xlogprefetch.c.
*/
typedef struct PgStat_RecoveryPrefetchStats
{
PgStat_Counter prefetch;
PgStat_Counter skip_hit;
PgStat_Counter skip_new;
PgStat_Counter skip_fpw;
PgStat_Counter skip_seq;
TimestampTz stat_reset_timestamp;
} PgStat_RecoveryPrefetchStats;
/* ------------------------------------------------------------ /* ------------------------------------------------------------
* Message formats follow * Message formats follow
...@@ -536,6 +550,15 @@ typedef struct PgStat_MsgReplSlot ...@@ -536,6 +550,15 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_stream_bytes; PgStat_Counter m_stream_bytes;
} PgStat_MsgReplSlot; } PgStat_MsgReplSlot;
/* ----------
* PgStat_MsgRecoveryPrefetch Sent by XLogPrefetch to save statistics.
* ----------
*/
typedef struct PgStat_MsgRecoveryPrefetch
{
PgStat_MsgHdr m_hdr;
PgStat_RecoveryPrefetchStats m_stats;
} PgStat_MsgRecoveryPrefetch;
/* ---------- /* ----------
* PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict
...@@ -699,6 +722,7 @@ typedef union PgStat_Msg ...@@ -699,6 +722,7 @@ typedef union PgStat_Msg
PgStat_MsgBgWriter msg_bgwriter; PgStat_MsgBgWriter msg_bgwriter;
PgStat_MsgWal msg_wal; PgStat_MsgWal msg_wal;
PgStat_MsgSLRU msg_slru; PgStat_MsgSLRU msg_slru;
PgStat_MsgRecoveryPrefetch msg_recoveryprefetch;
PgStat_MsgFuncstat msg_funcstat; PgStat_MsgFuncstat msg_funcstat;
PgStat_MsgFuncpurge msg_funcpurge; PgStat_MsgFuncpurge msg_funcpurge;
PgStat_MsgRecoveryConflict msg_recoveryconflict; PgStat_MsgRecoveryConflict msg_recoveryconflict;
...@@ -1088,6 +1112,7 @@ extern void pgstat_twophase_postabort(TransactionId xid, uint16 info, ...@@ -1088,6 +1112,7 @@ extern void pgstat_twophase_postabort(TransactionId xid, uint16 info,
extern void pgstat_send_archiver(const char *xlog, bool failed); extern void pgstat_send_archiver(const char *xlog, bool failed);
extern void pgstat_send_bgwriter(void); extern void pgstat_send_bgwriter(void);
extern void pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats);
extern void pgstat_report_wal(void); extern void pgstat_report_wal(void);
extern bool pgstat_send_wal(bool force); extern bool pgstat_send_wal(bool force);
...@@ -1104,6 +1129,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void); ...@@ -1104,6 +1129,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void);
extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
extern PgStat_SLRUStats *pgstat_fetch_slru(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void);
extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p); extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
extern void pgstat_count_slru_page_zeroed(int slru_idx); extern void pgstat_count_slru_page_zeroed(int slru_idx);
extern void pgstat_count_slru_page_hit(int slru_idx); extern void pgstat_count_slru_page_hit(int slru_idx);
......
...@@ -443,4 +443,8 @@ extern void assign_search_path(const char *newval, void *extra); ...@@ -443,4 +443,8 @@ extern void assign_search_path(const char *newval, void *extra);
extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
extern void assign_xlog_sync_method(int new_sync_method, void *extra); extern void assign_xlog_sync_method(int new_sync_method, void *extra);
/* in access/transam/xlogprefetch.c */
extern void assign_recovery_prefetch(bool new_value, void *extra);
extern void assign_recovery_prefetch_fpw(bool new_value, void *extra);
#endif /* GUC_H */ #endif /* GUC_H */
...@@ -1879,6 +1879,17 @@ pg_stat_gssapi| SELECT s.pid, ...@@ -1879,6 +1879,17 @@ pg_stat_gssapi| SELECT s.pid,
s.gss_enc AS encrypted s.gss_enc AS encrypted
FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, queryid) FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, queryid)
WHERE (s.client_port IS NOT NULL); WHERE (s.client_port IS NOT NULL);
pg_stat_prefetch_recovery| SELECT s.stats_reset,
s.prefetch,
s.skip_hit,
s.skip_new,
s.skip_fpw,
s.skip_seq,
s.distance,
s.queue_depth,
s.avg_distance,
s.avg_queue_depth
FROM pg_stat_get_prefetch_recovery() s(stats_reset, prefetch, skip_hit, skip_new, skip_fpw, skip_seq, distance, queue_depth, avg_distance, avg_queue_depth);
pg_stat_progress_analyze| SELECT s.pid, pg_stat_progress_analyze| SELECT s.pid,
s.datid, s.datid,
d.datname, d.datname,
......
...@@ -2803,6 +2803,10 @@ XLogPageHeader ...@@ -2803,6 +2803,10 @@ XLogPageHeader
XLogPageHeaderData XLogPageHeaderData
XLogPageReadCB XLogPageReadCB
XLogPageReadPrivate XLogPageReadPrivate
XLogPrefetcher
XLogPrefetcherFilter
XLogPrefetchState
XLogPrefetchStats
XLogReaderRoutine XLogReaderRoutine
XLogReaderState XLogReaderState
XLogRecData XLogRecData
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment