Commit 428b1d6b authored by Andres Freund's avatar Andres Freund

Allow to trigger kernel writeback after a configurable number of writes.

Currently writes to the main data files of postgres all go through the
OS page cache. This means that some operating systems can end up
collecting a large number of dirty buffers in their respective page
caches.  When these dirty buffers are flushed to storage rapidly, be it
because of fsync(), timeouts, or dirty ratios, latency for other reads
and writes can increase massively.  This is the primary reason for
regular massive stalls observed in real world scenarios and artificial
benchmarks; on rotating disks stalls on the order of hundreds of seconds
have been observed.

On linux it is possible to control this by reducing the global dirty
limits significantly, reducing the above problem. But global
configuration is rather problematic because it'll affect other
applications; also PostgreSQL itself doesn't always generally want this
behavior, e.g. for temporary files it's undesirable.

Several operating systems allow some control over the kernel page
cache. Linux has sync_file_range(2), several posix systems have msync(2)
and posix_fadvise(2). sync_file_range(2) is preferable because it
requires no special setup, whereas msync() requires the to-be-flushed
range to be mmap'ed. For the purpose of flushing dirty data
posix_fadvise(2) is the worst alternative, as flushing dirty data is
just a side-effect of POSIX_FADV_DONTNEED, which also removes the pages
from the page cache.  Thus the feature is enabled by default only on
linux, but can be enabled on all systems that have any of the above
APIs.

While desirable and likely possible this patch does not contain an
implementation for windows.

With the infrastructure added, writes made via checkpointer, bgwriter
and normal user backends can be flushed after a configurable number of
writes. Each of these sources of writes controlled by a separate GUC,
checkpointer_flush_after, bgwriter_flush_after and backend_flush_after
respectively; they're separate because the number of flushes that are
good are separate, and because the performance considerations of
controlled flushing for each of these are different.

A later patch will add checkpoint sorting - after that flushes from the
ckeckpoint will almost always be desirable. Bgwriter flushes are most of
the time going to be random, which are slow on lots of storage hardware.
Flushing in backends works well if the storage and bgwriter can keep up,
but if not it can have negative consequences.  This patch is likely to
have negative performance consequences without checkpoint sorting, but
unfortunately so has sorting without flush control.

Discussion: alpine.DEB.2.10.1506011320000.28433@sto
Author: Fabien Coelho and Andres Freund
parent c82c92b1
...@@ -1843,6 +1843,35 @@ include_dir 'conf.d' ...@@ -1843,6 +1843,35 @@ include_dir 'conf.d'
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="guc-bgwriter-flush-after" xreflabel="bgwriter_flush_after">
<term><varname>bgwriter_flush_after</varname> (<type>int</type>)
<indexterm>
<primary><varname>bgwriter_flush_after</> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Whenever more than <varname>bgwriter_flush_after</varname> bytes have
been written by the bgwriter, attempt to force the OS to issue these
writes to the underlying storage. Doing so will limit the amount of
dirty data in the kernel's page cache, reducing the likelihood of
stalls when an fsync is issued at the end of a checkpoint, or when
the OS writes data back in larger batches in the background. Often
that will result in greatly reduced transaction latency, but there
also are some cases, especially with workloads that are bigger than
<xref linkend="guc-shared-buffers">, but smaller than the OS's page
cache, where performance might degrade. This setting may have no
effect on some platforms. The valid range is between
<literal>0</literal>, which disables controlled writeback, and
<literal>2MB</literal>. The default is <literal>512Kb</> on Linux,
<literal>0</> elsewhere. (Non-default values of
<symbol>BLCKSZ</symbol> change the default and maximum.)
This parameter can only be set in the <filename>postgresql.conf</>
file or on the server command line.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
<para> <para>
...@@ -1944,6 +1973,35 @@ include_dir 'conf.d' ...@@ -1944,6 +1973,35 @@ include_dir 'conf.d'
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="guc-backend-flush-after" xreflabel="backend_flush_after">
<term><varname>backend_flush_after</varname> (<type>int</type>)
<indexterm>
<primary><varname>backend_flush_after</> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Whenever more than <varname>backend_flush_after</varname> bytes have
been written by a single backend, attempt to force the OS to issue
these writes to the underlying storage. Doing so will limit the
amount of dirty data in the kernel's page cache, reducing the
likelihood of stalls when an fsync is issued at the end of a
checkpoint, or when the OS writes data back in larger batches in the
background. Often that will result in greatly reduced transaction
latency, but there also are some cases, especially with workloads
that are bigger than <xref linkend="guc-shared-buffers">, but smaller
than the OS's page cache, where performance might degrade. This
setting may have no effect on some platforms. The valid range is
between <literal>0</literal>, which disables controlled writeback,
and <literal>2MB</literal>. The default is <literal>128Kb</> on
Linux, <literal>0</> elsewhere. (Non-default values of
<symbol>BLCKSZ</symbol> change the default and maximum.)
This parameter can only be set in the <filename>postgresql.conf</>
file or on the server command line.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
</sect2> </sect2>
</sect1> </sect1>
...@@ -2475,6 +2533,35 @@ include_dir 'conf.d' ...@@ -2475,6 +2533,35 @@ include_dir 'conf.d'
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="guc-checkpoint-flush-after" xreflabel="checkpoint_flush_after">
<term><varname>checkpoint_flush_after</varname> (<type>int</type>)
<indexterm>
<primary><varname>checkpoint_flush_after</> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Whenever more than <varname>checkpoint_flush_after</varname> bytes
have been written while performing a checkpoint, attempt to force the
OS to issue these writes to the underlying storage. Doing so will
limit the amount of dirty data in the kernel's page cache, reducing
the likelihood of stalls when an fsync is issued at the end of the
checkpoint, or when the OS writes data back in larger batches in the
background. Often that will result in greatly reduced transaction
latency, but there also are some cases, especially with workloads
that are bigger than <xref linkend="guc-shared-buffers">, but smaller
than the OS's page cache, where performance might degrade. This
setting may have no effect on some platforms. The valid range is
between <literal>0</literal>, which disables controlled writeback,
and <literal>2MB</literal>. The default is <literal>128Kb</> on
Linux, <literal>0</> elsewhere. (Non-default values of
<symbol>BLCKSZ</symbol> change the default and maximum.)
This parameter can only be set in the <filename>postgresql.conf</>
file or on the server command line.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-checkpoint-warning" xreflabel="checkpoint_warning"> <varlistentry id="guc-checkpoint-warning" xreflabel="checkpoint_warning">
<term><varname>checkpoint_warning</varname> (<type>integer</type>) <term><varname>checkpoint_warning</varname> (<type>integer</type>)
<indexterm> <indexterm>
......
...@@ -545,6 +545,17 @@ ...@@ -545,6 +545,17 @@
unexpected variation in the number of WAL segments needed. unexpected variation in the number of WAL segments needed.
</para> </para>
<para>
On Linux and POSIX platforms <xref linkend="guc-checkpoint-flush-after">
allows to force the OS that pages written by the checkpoint should be
flushed to disk after a configurable number of bytes. Otherwise, these
pages may be kept in the OS's page cache, inducing a stall when
<literal>fsync</> is issued at the end of a checkpoint. This setting will
often help to reduce transaction latency, but it also can an adverse effect
on performance; particularly for workloads that are bigger than
<xref linkend="guc-shared-buffers">, but smaller than the OS's page cache.
</para>
<para> <para>
The number of WAL segment files in <filename>pg_xlog</> directory depends on The number of WAL segment files in <filename>pg_xlog</> directory depends on
<varname>min_wal_size</>, <varname>max_wal_size</> and <varname>min_wal_size</>, <varname>max_wal_size</> and
......
...@@ -111,6 +111,7 @@ BackgroundWriterMain(void) ...@@ -111,6 +111,7 @@ BackgroundWriterMain(void)
sigjmp_buf local_sigjmp_buf; sigjmp_buf local_sigjmp_buf;
MemoryContext bgwriter_context; MemoryContext bgwriter_context;
bool prev_hibernate; bool prev_hibernate;
WritebackContext wb_context;
/* /*
* Properly accept or ignore signals the postmaster might send us. * Properly accept or ignore signals the postmaster might send us.
...@@ -164,6 +165,8 @@ BackgroundWriterMain(void) ...@@ -164,6 +165,8 @@ BackgroundWriterMain(void)
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
MemoryContextSwitchTo(bgwriter_context); MemoryContextSwitchTo(bgwriter_context);
WritebackContextInit(&wb_context, &bgwriter_flush_after);
/* /*
* If an exception is encountered, processing resumes here. * If an exception is encountered, processing resumes here.
* *
...@@ -208,6 +211,9 @@ BackgroundWriterMain(void) ...@@ -208,6 +211,9 @@ BackgroundWriterMain(void)
/* Flush any leaked data in the top-level context */ /* Flush any leaked data in the top-level context */
MemoryContextResetAndDeleteChildren(bgwriter_context); MemoryContextResetAndDeleteChildren(bgwriter_context);
/* re-initilialize to avoid repeated errors causing problems */
WritebackContextInit(&wb_context, &bgwriter_flush_after);
/* Now we can allow interrupts again */ /* Now we can allow interrupts again */
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
...@@ -272,7 +278,7 @@ BackgroundWriterMain(void) ...@@ -272,7 +278,7 @@ BackgroundWriterMain(void)
/* /*
* Do one cycle of dirty-buffer writing. * Do one cycle of dirty-buffer writing.
*/ */
can_hibernate = BgBufferSync(); can_hibernate = BgBufferSync(&wb_context);
/* /*
* Send off activity statistics to the stats collector * Send off activity statistics to the stats collector
......
...@@ -23,6 +23,7 @@ char *BufferBlocks; ...@@ -23,6 +23,7 @@ char *BufferBlocks;
LWLockMinimallyPadded *BufferIOLWLockArray = NULL; LWLockMinimallyPadded *BufferIOLWLockArray = NULL;
LWLockTranche BufferIOLWLockTranche; LWLockTranche BufferIOLWLockTranche;
LWLockTranche BufferContentLWLockTranche; LWLockTranche BufferContentLWLockTranche;
WritebackContext BackendWritebackContext;
/* /*
...@@ -149,6 +150,10 @@ InitBufferPool(void) ...@@ -149,6 +150,10 @@ InitBufferPool(void)
/* Init other shared buffer-management stuff */ /* Init other shared buffer-management stuff */
StrategyInitialize(!foundDescs); StrategyInitialize(!foundDescs);
/* Initialize per-backend file flush context */
WritebackContextInit(&BackendWritebackContext,
&backend_flush_after);
} }
/* /*
......
...@@ -82,6 +82,14 @@ double bgwriter_lru_multiplier = 2.0; ...@@ -82,6 +82,14 @@ double bgwriter_lru_multiplier = 2.0;
bool track_io_timing = false; bool track_io_timing = false;
int effective_io_concurrency = 0; int effective_io_concurrency = 0;
/*
* GUC variables about triggering kernel writeback for buffers written; OS
* dependant defaults are set via the GUC mechanism.
*/
int checkpoint_flush_after = 0;
int bgwriter_flush_after = 0;
int backend_flush_after = 0;
/* /*
* How many buffers PrefetchBuffer callers should try to stay ahead of their * How many buffers PrefetchBuffer callers should try to stay ahead of their
* ReadBuffer calls by. This is maintained by the assign hook for * ReadBuffer calls by. This is maintained by the assign hook for
...@@ -399,7 +407,7 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); ...@@ -399,7 +407,7 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy);
static void PinBuffer_Locked(BufferDesc *buf); static void PinBuffer_Locked(BufferDesc *buf);
static void UnpinBuffer(BufferDesc *buf, bool fixOwner); static void UnpinBuffer(BufferDesc *buf, bool fixOwner);
static void BufferSync(int flags); static void BufferSync(int flags);
static int SyncOneBuffer(int buf_id, bool skip_recently_used); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context);
static void WaitIO(BufferDesc *buf); static void WaitIO(BufferDesc *buf);
static bool StartBufferIO(BufferDesc *buf, bool forInput); static bool StartBufferIO(BufferDesc *buf, bool forInput);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
...@@ -416,6 +424,7 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln); ...@@ -416,6 +424,7 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
static void AtProcExit_Buffers(int code, Datum arg); static void AtProcExit_Buffers(int code, Datum arg);
static void CheckForBufferLeaks(void); static void CheckForBufferLeaks(void);
static int rnode_comparator(const void *p1, const void *p2); static int rnode_comparator(const void *p1, const void *p2);
static int buffertag_comparator(const void *p1, const void *p2);
/* /*
...@@ -818,6 +827,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -818,6 +827,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
MemSet((char *) bufBlock, 0, BLCKSZ); MemSet((char *) bufBlock, 0, BLCKSZ);
/* don't set checksum for all-zero page */ /* don't set checksum for all-zero page */
smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false); smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false);
/*
* NB: we're *not* doing a ScheduleBufferTagForWriteback here;
* although we're essentially performing a write. At least on linux
* doing so defeats the 'delayed allocation' mechanism, leading to
* increased file fragmentation.
*/
} }
else else
{ {
...@@ -1084,6 +1100,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, ...@@ -1084,6 +1100,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
FlushBuffer(buf, NULL); FlushBuffer(buf, NULL);
LWLockRelease(BufferDescriptorGetContentLock(buf)); LWLockRelease(BufferDescriptorGetContentLock(buf));
ScheduleBufferTagForWriteback(&BackendWritebackContext,
&buf->tag);
TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum, TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
smgr->smgr_rnode.node.spcNode, smgr->smgr_rnode.node.spcNode,
smgr->smgr_rnode.node.dbNode, smgr->smgr_rnode.node.dbNode,
...@@ -1642,6 +1661,7 @@ BufferSync(int flags) ...@@ -1642,6 +1661,7 @@ BufferSync(int flags)
int num_to_write; int num_to_write;
int num_written; int num_written;
int mask = BM_DIRTY; int mask = BM_DIRTY;
WritebackContext wb_context;
/* Make sure we can handle the pin inside SyncOneBuffer */ /* Make sure we can handle the pin inside SyncOneBuffer */
ResourceOwnerEnlargeBuffers(CurrentResourceOwner); ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
...@@ -1694,6 +1714,8 @@ BufferSync(int flags) ...@@ -1694,6 +1714,8 @@ BufferSync(int flags)
if (num_to_write == 0) if (num_to_write == 0)
return; /* nothing to do */ return; /* nothing to do */
WritebackContextInit(&wb_context, &checkpoint_flush_after);
TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write); TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
/* /*
...@@ -1725,7 +1747,7 @@ BufferSync(int flags) ...@@ -1725,7 +1747,7 @@ BufferSync(int flags)
*/ */
if (bufHdr->flags & BM_CHECKPOINT_NEEDED) if (bufHdr->flags & BM_CHECKPOINT_NEEDED)
{ {
if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN) if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
{ {
TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
BgWriterStats.m_buf_written_checkpoints++; BgWriterStats.m_buf_written_checkpoints++;
...@@ -1756,6 +1778,9 @@ BufferSync(int flags) ...@@ -1756,6 +1778,9 @@ BufferSync(int flags)
buf_id = 0; buf_id = 0;
} }
/* issue all pending flushes */
IssuePendingWritebacks(&wb_context);
/* /*
* Update checkpoint statistics. As noted above, this doesn't include * Update checkpoint statistics. As noted above, this doesn't include
* buffers written by other backends or bgwriter scan. * buffers written by other backends or bgwriter scan.
...@@ -1777,7 +1802,7 @@ BufferSync(int flags) ...@@ -1777,7 +1802,7 @@ BufferSync(int flags)
* bgwriter_lru_maxpages to 0.) * bgwriter_lru_maxpages to 0.)
*/ */
bool bool
BgBufferSync(void) BgBufferSync(WritebackContext *wb_context)
{ {
/* info obtained from freelist.c */ /* info obtained from freelist.c */
int strategy_buf_id; int strategy_buf_id;
...@@ -2002,7 +2027,8 @@ BgBufferSync(void) ...@@ -2002,7 +2027,8 @@ BgBufferSync(void)
/* Execute the LRU scan */ /* Execute the LRU scan */
while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
{ {
int buffer_state = SyncOneBuffer(next_to_clean, true); int buffer_state = SyncOneBuffer(next_to_clean, true,
wb_context);
if (++next_to_clean >= NBuffers) if (++next_to_clean >= NBuffers)
{ {
...@@ -2079,10 +2105,11 @@ BgBufferSync(void) ...@@ -2079,10 +2105,11 @@ BgBufferSync(void)
* Note: caller must have done ResourceOwnerEnlargeBuffers. * Note: caller must have done ResourceOwnerEnlargeBuffers.
*/ */
static int static int
SyncOneBuffer(int buf_id, bool skip_recently_used) SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
{ {
BufferDesc *bufHdr = GetBufferDescriptor(buf_id); BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
int result = 0; int result = 0;
BufferTag tag;
ReservePrivateRefCountEntry(); ReservePrivateRefCountEntry();
...@@ -2123,8 +2150,13 @@ SyncOneBuffer(int buf_id, bool skip_recently_used) ...@@ -2123,8 +2150,13 @@ SyncOneBuffer(int buf_id, bool skip_recently_used)
FlushBuffer(bufHdr, NULL); FlushBuffer(bufHdr, NULL);
LWLockRelease(BufferDescriptorGetContentLock(bufHdr)); LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
tag = bufHdr->tag;
UnpinBuffer(bufHdr, true); UnpinBuffer(bufHdr, true);
ScheduleBufferTagForWriteback(wb_context, &tag);
return result | BUF_WRITTEN; return result | BUF_WRITTEN;
} }
...@@ -3729,3 +3761,154 @@ rnode_comparator(const void *p1, const void *p2) ...@@ -3729,3 +3761,154 @@ rnode_comparator(const void *p1, const void *p2)
else else
return 0; return 0;
} }
/*
* BufferTag comparator.
*/
static int
buffertag_comparator(const void *a, const void *b)
{
const BufferTag *ba = (const BufferTag *) a;
const BufferTag *bb = (const BufferTag *) b;
int ret;
ret = rnode_comparator(&ba->rnode, &bb->rnode);
if (ret != 0)
return ret;
if (ba->forkNum < bb->forkNum)
return -1;
if (ba->forkNum > bb->forkNum)
return 1;
if (ba->blockNum < bb->blockNum)
return -1;
if (ba->blockNum > bb->blockNum)
return 1;
return 0;
}
/*
* Initialize a writeback context, discarding potential previous state.
*
* *max_coalesce is a pointer to a variable containing the current maximum
* number of writeback requests that will be coalesced into a bigger one. A
* value <= 0 means that no writeback control will be performed. max_pending
* is a pointer instead of an immediate value, so the coalesce limits can
* easily changed by the GUC mechanism, and so calling code does not have to
* check the current configuration.
*/
void
WritebackContextInit(WritebackContext *context, int *max_pending)
{
Assert(*max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
context->max_pending = max_pending;
context->nr_pending = 0;
}
/*
* Add buffer to list of pending writeback requests.
*/
void
ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag)
{
PendingWriteback *pending;
/*
* Add buffer to the pending writeback array, unless writeback control is
* disabled.
*/
if (*context->max_pending > 0)
{
Assert(*context->max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
pending = &context->pending_writebacks[context->nr_pending++];
pending->tag = *tag;
}
/*
* Perform pending flushes if the writeback limit is exceeded. This
* includes the case where previously an item has been added, but control
* is now disabled.
*/
if (context->nr_pending >= *context->max_pending)
IssuePendingWritebacks(context);
}
/*
* Issue all pending writeback requests, previously scheduled with
* ScheduleBufferTagForWriteback, to the OS.
*
* Because this is only used to improve the OSs IO scheduling we try to never
* error out - it's just a hint.
*/
void
IssuePendingWritebacks(WritebackContext *context)
{
int i;
if (context->nr_pending == 0)
return;
/*
* Executing the writes in-order can make them a lot faster, and allows to
* merge writeback requests to consecutive blocks into larger writebacks.
*/
qsort(&context->pending_writebacks, context->nr_pending,
sizeof(PendingWriteback), buffertag_comparator);
/*
* Coalesce neighbouring writes, but nothing else. For that we iterate
* through the, now sorted, array of pending flushes, and look forward to
* find all neighbouring (or identical) writes.
*/
for (i = 0; i < context->nr_pending; i++)
{
PendingWriteback *cur;
PendingWriteback *next;
SMgrRelation reln;
int ahead;
BufferTag tag;
Size nblocks = 1;
cur = &context->pending_writebacks[i];
tag = cur->tag;
/*
* Peek ahead, into following writeback requests, to see if they can
* be combined with the current one.
*/
for (ahead = 0; i + ahead + 1 < context->nr_pending; ahead++)
{
next = &context->pending_writebacks[i + ahead + 1];
/* different file, stop */
if (!RelFileNodeEquals(cur->tag.rnode, next->tag.rnode) ||
cur->tag.forkNum != next->tag.forkNum)
break;
/* ok, block queued twice, skip */
if (cur->tag.blockNum == next->tag.blockNum)
continue;
/* only merge consecutive writes */
if (cur->tag.blockNum + 1 != next->tag.blockNum)
break;
nblocks++;
cur = next;
}
i += ahead;
/* and finally tell the kernel to write the data to storage */
reln = smgropen(tag.rnode, InvalidBackendId);
smgrwriteback(reln, tag.forkNum, tag.blockNum, nblocks);
}
context->nr_pending = 0;
}
...@@ -190,9 +190,9 @@ copy_file(char *fromfile, char *tofile) ...@@ -190,9 +190,9 @@ copy_file(char *fromfile, char *tofile)
/* /*
* We fsync the files later but first flush them to avoid spamming the * We fsync the files later but first flush them to avoid spamming the
* cache and hopefully get the kernel to start writing them out before * cache and hopefully get the kernel to start writing them out before
* the fsync comes. Ignore any error, since it's only a hint. * the fsync comes.
*/ */
(void) pg_flush_data(dstfd, offset, nbytes); pg_flush_data(dstfd, offset, nbytes);
} }
if (CloseTransientFile(dstfd)) if (CloseTransientFile(dstfd))
......
...@@ -61,6 +61,9 @@ ...@@ -61,6 +61,9 @@
#include <sys/file.h> #include <sys/file.h>
#include <sys/param.h> #include <sys/param.h>
#include <sys/stat.h> #include <sys/stat.h>
#ifndef WIN32
#include <sys/mman.h>
#endif
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#ifdef HAVE_SYS_RESOURCE_H #ifdef HAVE_SYS_RESOURCE_H
...@@ -82,6 +85,8 @@ ...@@ -82,6 +85,8 @@
/* Define PG_FLUSH_DATA_WORKS if we have an implementation for pg_flush_data */ /* Define PG_FLUSH_DATA_WORKS if we have an implementation for pg_flush_data */
#if defined(HAVE_SYNC_FILE_RANGE) #if defined(HAVE_SYNC_FILE_RANGE)
#define PG_FLUSH_DATA_WORKS 1 #define PG_FLUSH_DATA_WORKS 1
#elif !defined(WIN32) && defined(MS_ASYNC)
#define PG_FLUSH_DATA_WORKS 1
#elif defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) #elif defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
#define PG_FLUSH_DATA_WORKS 1 #define PG_FLUSH_DATA_WORKS 1
#endif #endif
...@@ -383,29 +388,126 @@ pg_fdatasync(int fd) ...@@ -383,29 +388,126 @@ pg_fdatasync(int fd)
} }
/* /*
* pg_flush_data --- advise OS that the data described won't be needed soon * pg_flush_data --- advise OS that the described dirty data should be flushed
* *
* Not all platforms have sync_file_range or posix_fadvise; treat as no-op * An offset of 0 with an nbytes 0 means that the entire file should be
* if not available. Also, treat as no-op if enableFsync is off; this is * flushed.
* because the call isn't free, and some platforms such as Linux will actually
* block the requestor until the write is scheduled.
*/ */
int void
pg_flush_data(int fd, off_t offset, off_t amount) pg_flush_data(int fd, off_t offset, off_t nbytes)
{ {
#ifdef PG_FLUSH_DATA_WORKS /*
if (enableFsync) * Right now file flushing is primarily used to avoid making later
{ * fsync()/fdatasync() calls have a less impact. Thus don't trigger
* flushes if fsyncs are disabled - that's a decision we might want to
* make configurable at some point.
*/
if (!enableFsync)
return;
/*
* XXX: compile all alternatives, to find portability problems more easily
*/
#if defined(HAVE_SYNC_FILE_RANGE) #if defined(HAVE_SYNC_FILE_RANGE)
return sync_file_range(fd, offset, amount, SYNC_FILE_RANGE_WRITE); {
#elif defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) int rc = 0;
return posix_fadvise(fd, offset, amount, POSIX_FADV_DONTNEED);
#else /*
#error PG_FLUSH_DATA_WORKS should not have been defined * sync_file_range(SYNC_FILE_RANGE_WRITE), currently linux specific,
* tells the OS that writeback for the passed in blocks should be
* started, but that we don't want to wait for completion. Note that
* this call might block if too much dirty data exists in the range.
* This is the preferrable method on OSs supporting it, as it works
* reliably when available (contrast to msync()) and doesn't flush out
* clean data (like FADV_DONTNEED).
*/
rc = sync_file_range(fd, offset, nbytes,
SYNC_FILE_RANGE_WRITE);
/* don't error out, this is just a performance optimization */
if (rc != 0)
{
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not flush dirty data: %m")));
}
return;
}
#endif #endif
#if !defined(WIN32) && defined(MS_ASYNC)
{
int rc = 0;
void *p;
/*
* On several OSs msync(MS_ASYNC) on a mmap'ed file triggers
* writeback. On linux it only does so with MS_SYNC is specified, but
* then it does the writeback synchronously. Luckily all common linux
* systems have sync_file_range(). This is preferrable over
* FADV_DONTNEED because it doesn't flush out clean data.
*
* We map the file (mmap()), tell the kernel to sync back the contents
* (msync()), and then remove the mapping again (munmap()).
*/
p = mmap(NULL, nbytes,
PROT_READ | PROT_WRITE, MAP_SHARED,
fd, offset);
if (p == MAP_FAILED)
{
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not mmap while flushing dirty data: %m")));
return;
}
rc = msync(p, nbytes, MS_ASYNC);
if (rc != 0)
{
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not flush dirty data: %m")));
/* NB: need to fall through to munmap()! */
}
rc = munmap(p, nbytes);
if (rc != 0)
{
/* FATAL error because mapping would remain */
ereport(FATAL,
(errcode_for_file_access(),
errmsg("could not munmap while flushing blocks: %m")));
}
return;
}
#endif
#if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
{
int rc = 0;
/*
* Signal the kernel that the passed in range should not be cached
* anymore. This has the, desired, side effect of writing out dirty
* data, and the, undesired, side effect of likely discarding useful
* clean cached blocks. For the latter reason this is the least
* preferrable method.
*/
rc = posix_fadvise(fd, offset, nbytes, POSIX_FADV_DONTNEED);
/* don't error out, this is just a performance optimization */
if (rc != 0)
{
ereport(WARNING,
(errcode_for_file_access(),
errmsg("could not flush dirty data: %m")));
return;
}
return;
} }
#endif #endif
return 0;
} }
...@@ -1396,6 +1498,24 @@ FilePrefetch(File file, off_t offset, int amount) ...@@ -1396,6 +1498,24 @@ FilePrefetch(File file, off_t offset, int amount)
#endif #endif
} }
void
FileWriteback(File file, off_t offset, int amount)
{
int returnCode;
Assert(FileIsValid(file));
DO_DB(elog(LOG, "FileWriteback: %d (%s) " INT64_FORMAT " %d",
file, VfdCache[file].fileName,
(int64) offset, amount));
returnCode = FileAccess(file);
if (returnCode < 0)
return;
pg_flush_data(VfdCache[file].fd, offset, amount);
}
int int
FileRead(File file, char *buffer, int amount) FileRead(File file, char *buffer, int amount)
{ {
...@@ -2796,9 +2916,10 @@ pre_sync_fname(const char *fname, bool isdir, int elevel) ...@@ -2796,9 +2916,10 @@ pre_sync_fname(const char *fname, bool isdir, int elevel)
} }
/* /*
* We ignore errors from pg_flush_data() because this is only a hint. * pg_flush_data() ignores errors, which is ok because this is only a
* hint.
*/ */
(void) pg_flush_data(fd, 0, 0); pg_flush_data(fd, 0, 0);
(void) CloseTransientFile(fd); (void) CloseTransientFile(fd);
} }
......
...@@ -662,6 +662,56 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) ...@@ -662,6 +662,56 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
#endif /* USE_PREFETCH */ #endif /* USE_PREFETCH */
} }
/*
* mdwriteback() -- Tell the kernel to write pages back to storage.
*
* This accepts a range of blocks because flushing several pages at once is
* considerably more efficient than doing so individually.
*/
void
mdwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks)
{
/*
* Issue flush requests in as few requests as possible; have to split at
* segment boundaries though, since those are actually separate files.
*/
while (nblocks != 0)
{
int nflush = nblocks;
off_t seekpos;
MdfdVec *v;
int segnum_start,
segnum_end;
v = _mdfd_getseg(reln, forknum, blocknum, false,
EXTENSION_RETURN_NULL);
/*
* We might be flushing buffers of already removed relations, that's
* ok, just ignore that case.
*/
if (!v)
return;
/* compute offset inside the current segment */
segnum_start = blocknum / RELSEG_SIZE;
/* compute number of desired writes within the current segment */
segnum_end = (blocknum + nblocks - 1) / RELSEG_SIZE;
if (segnum_start != segnum_end)
nflush = RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE));
Assert(nflush >= 1);
Assert(nflush <= nblocks);
seekpos = (off_t) BLCKSZ *(blocknum % ((BlockNumber) RELSEG_SIZE));
FileWriteback(v->mdfd_vfd, seekpos, BLCKSZ * nflush);
nblocks -= nflush;
blocknum += nflush;
}
}
/* /*
* mdread() -- Read the specified block from a relation. * mdread() -- Read the specified block from a relation.
......
...@@ -53,6 +53,8 @@ typedef struct f_smgr ...@@ -53,6 +53,8 @@ typedef struct f_smgr
BlockNumber blocknum, char *buffer); BlockNumber blocknum, char *buffer);
void (*smgr_write) (SMgrRelation reln, ForkNumber forknum, void (*smgr_write) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync); BlockNumber blocknum, char *buffer, bool skipFsync);
void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nblocks);
BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum); BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum, void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks); BlockNumber nblocks);
...@@ -66,8 +68,8 @@ typedef struct f_smgr ...@@ -66,8 +68,8 @@ typedef struct f_smgr
static const f_smgr smgrsw[] = { static const f_smgr smgrsw[] = {
/* magnetic disk */ /* magnetic disk */
{mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend, {mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend,
mdprefetch, mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync, mdprefetch, mdread, mdwrite, mdwriteback, mdnblocks, mdtruncate,
mdpreckpt, mdsync, mdpostckpt mdimmedsync, mdpreckpt, mdsync, mdpostckpt
} }
}; };
...@@ -649,6 +651,19 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ...@@ -649,6 +651,19 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
buffer, skipFsync); buffer, skipFsync);
} }
/*
* smgrwriteback() -- Trigger kernel writeback for the supplied range of
* blocks.
*/
void
smgrwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
int nblocks)
{
(*(smgrsw[reln->smgr_which].smgr_writeback)) (reln, forknum, blocknum,
nblocks);
}
/* /*
* smgrnblocks() -- Calculate the number of blocks in the * smgrnblocks() -- Calculate the number of blocks in the
* supplied relation. * supplied relation.
......
...@@ -2384,6 +2384,42 @@ static struct config_int ConfigureNamesInt[] = ...@@ -2384,6 +2384,42 @@ static struct config_int ConfigureNamesInt[] =
check_effective_io_concurrency, assign_effective_io_concurrency, NULL check_effective_io_concurrency, assign_effective_io_concurrency, NULL
}, },
{
{"checkpoint_flush_after", PGC_SIGHUP, RESOURCES_ASYNCHRONOUS,
gettext_noop("Number of pages after which previously performed writes are flushed to disk."),
NULL,
GUC_UNIT_BLOCKS
},
&checkpoint_flush_after,
/* see bufmgr.h: OS dependant default */
DEFAULT_CHECKPOINT_FLUSH_AFTER, 0, WRITEBACK_MAX_PENDING_FLUSHES,
NULL, NULL, NULL
},
{
{"backend_flush_after", PGC_USERSET, WAL_CHECKPOINTS,
gettext_noop("Number of pages after which previously performed writes are flushed to disk."),
NULL,
GUC_UNIT_BLOCKS
},
&backend_flush_after,
/* see bufmgr.h: OS dependant default */
DEFAULT_BACKEND_FLUSH_AFTER, 0, WRITEBACK_MAX_PENDING_FLUSHES,
NULL, NULL, NULL
},
{
{"bgwriter_flush_after", PGC_SIGHUP, WAL_CHECKPOINTS,
gettext_noop("Number of pages after which previously performed writes are flushed to disk."),
NULL,
GUC_UNIT_BLOCKS
},
&bgwriter_flush_after,
/* see bufmgr.h: 16 on Linux, 0 otherwise */
DEFAULT_BGWRITER_FLUSH_AFTER, 0, WRITEBACK_MAX_PENDING_FLUSHES,
NULL, NULL, NULL
},
{ {
{"max_worker_processes", {"max_worker_processes",
PGC_POSTMASTER, PGC_POSTMASTER,
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define BUFMGR_INTERNALS_H #define BUFMGR_INTERNALS_H
#include "storage/buf.h" #include "storage/buf.h"
#include "storage/bufmgr.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "storage/lwlock.h" #include "storage/lwlock.h"
#include "storage/shmem.h" #include "storage/shmem.h"
...@@ -208,16 +209,44 @@ extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray; ...@@ -208,16 +209,44 @@ extern PGDLLIMPORT LWLockMinimallyPadded *BufferIOLWLockArray;
#define UnlockBufHdr(bufHdr) SpinLockRelease(&(bufHdr)->buf_hdr_lock) #define UnlockBufHdr(bufHdr) SpinLockRelease(&(bufHdr)->buf_hdr_lock)
/*
* The PendingWriteback & WritebackContext structure are used to keep
* information about pending flush requests to be issued to the OS.
*/
typedef struct PendingWriteback
{
/* could store different types of pending flushes here */
BufferTag tag;
} PendingWriteback;
/* struct forward declared in bufmgr.h */
typedef struct WritebackContext
{
/* pointer to the max number of writeback requests to coalesce */
int *max_pending;
/* current number of pending writeback requests */
int nr_pending;
/* pending requests */
PendingWriteback pending_writebacks[WRITEBACK_MAX_PENDING_FLUSHES];
} WritebackContext;
/* in buf_init.c */ /* in buf_init.c */
extern PGDLLIMPORT BufferDescPadded *BufferDescriptors; extern PGDLLIMPORT BufferDescPadded *BufferDescriptors;
extern PGDLLIMPORT WritebackContext BackendWritebackContext;
/* in localbuf.c */ /* in localbuf.c */
extern BufferDesc *LocalBufferDescriptors; extern BufferDesc *LocalBufferDescriptors;
/* /*
* Internal routines: only called by bufmgr * Internal buffer management routines
*/ */
/* bufmgr.c */
extern void WritebackContextInit(WritebackContext *context, int *max_coalesce);
extern void IssuePendingWritebacks(WritebackContext *context);
extern void ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag);
/* freelist.c */ /* freelist.c */
extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy); extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy);
......
...@@ -45,16 +45,36 @@ typedef enum ...@@ -45,16 +45,36 @@ typedef enum
* replay; otherwise same as RBM_NORMAL */ * replay; otherwise same as RBM_NORMAL */
} ReadBufferMode; } ReadBufferMode;
/* forward declared, to avoid having to expose buf_internals.h here */
struct WritebackContext;
/* in globals.c ... this duplicates miscadmin.h */ /* in globals.c ... this duplicates miscadmin.h */
extern PGDLLIMPORT int NBuffers; extern PGDLLIMPORT int NBuffers;
/* in bufmgr.c */ /* in bufmgr.c */
#define WRITEBACK_MAX_PENDING_FLUSHES 256
/* FIXME: Also default to on for mmap && msync(MS_ASYNC)? */
#ifdef HAVE_SYNC_FILE_RANGE
#define DEFAULT_CHECKPOINT_FLUSH_AFTER 32
#define DEFAULT_BACKEND_FLUSH_AFTER 16
#define DEFAULT_BGWRITER_FLUSH_AFTER 64
#else
#define DEFAULT_CHECKPOINT_FLUSH_AFTER 0
#define DEFAULT_BACKEND_FLUSH_AFTER 0
#define DEFAULT_BGWRITER_FLUSH_AFTER 0
#endif /* HAVE_SYNC_FILE_RANGE */
extern bool zero_damaged_pages; extern bool zero_damaged_pages;
extern int bgwriter_lru_maxpages; extern int bgwriter_lru_maxpages;
extern double bgwriter_lru_multiplier; extern double bgwriter_lru_multiplier;
extern bool track_io_timing; extern bool track_io_timing;
extern int target_prefetch_pages; extern int target_prefetch_pages;
extern int checkpoint_flush_after;
extern int backend_flush_after;
extern int bgwriter_flush_after;
/* in buf_init.c */ /* in buf_init.c */
extern PGDLLIMPORT char *BufferBlocks; extern PGDLLIMPORT char *BufferBlocks;
...@@ -209,7 +229,7 @@ extern bool HoldingBufferPinThatDelaysRecovery(void); ...@@ -209,7 +229,7 @@ extern bool HoldingBufferPinThatDelaysRecovery(void);
extern void AbortBufferIO(void); extern void AbortBufferIO(void);
extern void BufmgrCommit(void); extern void BufmgrCommit(void);
extern bool BgBufferSync(void); extern bool BgBufferSync(struct WritebackContext *wb_context);
extern void AtProcExit_LocalBuffers(void); extern void AtProcExit_LocalBuffers(void);
......
...@@ -74,6 +74,7 @@ extern int FileWrite(File file, char *buffer, int amount); ...@@ -74,6 +74,7 @@ extern int FileWrite(File file, char *buffer, int amount);
extern int FileSync(File file); extern int FileSync(File file);
extern off_t FileSeek(File file, off_t offset, int whence); extern off_t FileSeek(File file, off_t offset, int whence);
extern int FileTruncate(File file, off_t offset); extern int FileTruncate(File file, off_t offset);
extern void FileWriteback(File file, off_t offset, int amount);
extern char *FilePathName(File file); extern char *FilePathName(File file);
extern int FileGetRawDesc(File file); extern int FileGetRawDesc(File file);
extern int FileGetRawFlags(File file); extern int FileGetRawFlags(File file);
...@@ -115,7 +116,7 @@ extern int pg_fsync(int fd); ...@@ -115,7 +116,7 @@ extern int pg_fsync(int fd);
extern int pg_fsync_no_writethrough(int fd); extern int pg_fsync_no_writethrough(int fd);
extern int pg_fsync_writethrough(int fd); extern int pg_fsync_writethrough(int fd);
extern int pg_fdatasync(int fd); extern int pg_fdatasync(int fd);
extern int pg_flush_data(int fd, off_t offset, off_t amount); extern void pg_flush_data(int fd, off_t offset, off_t amount);
extern void fsync_fname(const char *fname, bool isdir); extern void fsync_fname(const char *fname, bool isdir);
extern int durable_rename(const char *oldfile, const char *newfile, int loglevel); extern int durable_rename(const char *oldfile, const char *newfile, int loglevel);
extern int durable_link_or_rename(const char *oldfile, const char *newfile, int loglevel); extern int durable_link_or_rename(const char *oldfile, const char *newfile, int loglevel);
......
...@@ -96,6 +96,8 @@ extern void smgrread(SMgrRelation reln, ForkNumber forknum, ...@@ -96,6 +96,8 @@ extern void smgrread(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer); BlockNumber blocknum, char *buffer);
extern void smgrwrite(SMgrRelation reln, ForkNumber forknum, extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync); BlockNumber blocknum, char *buffer, bool skipFsync);
extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nblocks);
extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum); extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum, extern void smgrtruncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks); BlockNumber nblocks);
...@@ -122,6 +124,8 @@ extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ...@@ -122,6 +124,8 @@ extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer); char *buffer);
extern void mdwrite(SMgrRelation reln, ForkNumber forknum, extern void mdwrite(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool skipFsync); BlockNumber blocknum, char *buffer, bool skipFsync);
extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, int nblocks);
extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum); extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
extern void mdtruncate(SMgrRelation reln, ForkNumber forknum, extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks); BlockNumber nblocks);
......
...@@ -1411,6 +1411,7 @@ Pattern_Type ...@@ -1411,6 +1411,7 @@ Pattern_Type
PendingOperationEntry PendingOperationEntry
PendingRelDelete PendingRelDelete
PendingUnlinkEntry PendingUnlinkEntry
PendingWriteback
PerlInterpreter PerlInterpreter
Perl_ppaddr_t Perl_ppaddr_t
Permutation Permutation
...@@ -2142,6 +2143,7 @@ WriteBytePtr ...@@ -2142,6 +2143,7 @@ WriteBytePtr
WriteDataPtr WriteDataPtr
WriteExtraTocPtr WriteExtraTocPtr
WriteFunc WriteFunc
WritebackContext
X509 X509
X509_NAME X509_NAME
X509_NAME_ENTRY X509_NAME_ENTRY
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment