Commit 57aa5b2b authored by Fujii Masao's avatar Fujii Masao

Add GUC to enable compression of full page images stored in WAL.

When newly-added GUC parameter, wal_compression, is on, the PostgreSQL server
compresses a full page image written to WAL when full_page_writes is on or
during a base backup. A compressed page image will be decompressed during WAL
replay. Turning this parameter on can reduce the WAL volume without increasing
the risk of unrecoverable data corruption, but at the cost of some extra CPU
spent on the compression during WAL logging and on the decompression during
WAL replay.

This commit changes the WAL format (so bumping WAL version number) so that
the one-byte flag indicating whether a full page image is compressed or not is
included in its header information. This means that the commit increases the
WAL volume one-byte per a full page image even if WAL compression is not used
at all. We can save that one-byte by borrowing one-bit from the existing field
like hole_offset in the header and using it as the flag, for example. But which
would reduce the code readability and the extensibility of the feature.
Per discussion, it's not worth paying those prices to save only one-byte, so we
decided to add the one-byte flag to the header.

This commit doesn't introduce any new compression algorithm like lz4.
Currently a full page image is compressed using the existing PGLZ algorithm.
Per discussion, we decided to use it at least in the first version of the
feature because there were no performance reports showing that its compression
ratio is unacceptably lower than that of other algorithm. Of course,
in the future, it's worth considering the support of other compression
algorithm for the better compression.

Rahila Syed and Michael Paquier, reviewed in various versions by myself,
Andres Freund, Robert Haas, Abhijit Menon-Sen and many others.
parent 2fbb2866
...@@ -359,18 +359,17 @@ XLogDumpCountRecord(XLogDumpConfig *config, XLogDumpStats *stats, ...@@ -359,18 +359,17 @@ XLogDumpCountRecord(XLogDumpConfig *config, XLogDumpStats *stats,
rec_len = XLogRecGetDataLen(record) + SizeOfXLogRecord; rec_len = XLogRecGetDataLen(record) + SizeOfXLogRecord;
/* /*
* Calculate the amount of FPI data in the record. Each backup block * Calculate the amount of FPI data in the record.
* takes up BLCKSZ bytes, minus the "hole" length.
* *
* XXX: We peek into xlogreader's private decoded backup blocks for the * XXX: We peek into xlogreader's private decoded backup blocks for the
* hole_length. It doesn't seem worth it to add an accessor macro for * bimg_len indicating the length of FPI data. It doesn't seem worth it to
* this. * add an accessor macro for this.
*/ */
fpi_len = 0; fpi_len = 0;
for (block_id = 0; block_id <= record->max_block_id; block_id++) for (block_id = 0; block_id <= record->max_block_id; block_id++)
{ {
if (XLogRecHasBlockImage(record, block_id)) if (XLogRecHasBlockImage(record, block_id))
fpi_len += BLCKSZ - record->blocks[block_id].hole_length; fpi_len += record->blocks[block_id].bimg_len;
} }
/* Update per-rmgr statistics */ /* Update per-rmgr statistics */
...@@ -465,9 +464,22 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) ...@@ -465,9 +464,22 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
blk); blk);
if (XLogRecHasBlockImage(record, block_id)) if (XLogRecHasBlockImage(record, block_id))
{ {
printf(" (FPW); hole: offset: %u, length: %u\n", if (record->blocks[block_id].bimg_info &
record->blocks[block_id].hole_offset, BKPIMAGE_IS_COMPRESSED)
record->blocks[block_id].hole_length); {
printf(" (FPW); hole: offset: %u, length: %u, compression saved: %u\n",
record->blocks[block_id].hole_offset,
record->blocks[block_id].hole_length,
BLCKSZ -
record->blocks[block_id].hole_length -
record->blocks[block_id].bimg_len);
}
else
{
printf(" (FPW); hole: offset: %u, length: %u\n",
record->blocks[block_id].hole_offset,
record->blocks[block_id].hole_length);
}
} }
putchar('\n'); putchar('\n');
} }
......
...@@ -2282,6 +2282,30 @@ include_dir 'conf.d' ...@@ -2282,6 +2282,30 @@ include_dir 'conf.d'
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="guc-wal-compression" xreflabel="wal_compression">
<term><varname>wal_compression</varname> (<type>boolean</type>)
<indexterm>
<primary><varname>wal_compression</> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
When this parameter is <literal>on</>, the <productname>PostgreSQL</>
server compresses a full page image written to WAL when
<xref linkend="guc-full-page-writes"> is on or during a base backup.
A compressed page image will be decompressed during WAL replay.
The default value is <literal>off</>.
</para>
<para>
Turning this parameter on can reduce the WAL volume without
increasing the risk of unrecoverable data corruption,
but at the cost of some extra CPU spent on the compression during
WAL logging and on the decompression during WAL replay.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-wal-buffers" xreflabel="wal_buffers"> <varlistentry id="guc-wal-buffers" xreflabel="wal_buffers">
<term><varname>wal_buffers</varname> (<type>integer</type>) <term><varname>wal_buffers</varname> (<type>integer</type>)
<indexterm> <indexterm>
......
...@@ -89,6 +89,7 @@ char *XLogArchiveCommand = NULL; ...@@ -89,6 +89,7 @@ char *XLogArchiveCommand = NULL;
bool EnableHotStandby = false; bool EnableHotStandby = false;
bool fullPageWrites = true; bool fullPageWrites = true;
bool wal_log_hints = false; bool wal_log_hints = false;
bool wal_compression = false;
bool log_checkpoints = false; bool log_checkpoints = false;
int sync_method = DEFAULT_SYNC_METHOD; int sync_method = DEFAULT_SYNC_METHOD;
int wal_level = WAL_LEVEL_MINIMAL; int wal_level = WAL_LEVEL_MINIMAL;
......
...@@ -24,12 +24,16 @@ ...@@ -24,12 +24,16 @@
#include "access/xlog_internal.h" #include "access/xlog_internal.h"
#include "access/xloginsert.h" #include "access/xloginsert.h"
#include "catalog/pg_control.h" #include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/proc.h" #include "storage/proc.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "pg_trace.h" #include "pg_trace.h"
/* Buffer size required to store a compressed version of backup block image */
#define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
/* /*
* For each block reference registered with XLogRegisterBuffer, we fill in * For each block reference registered with XLogRegisterBuffer, we fill in
* a registered_buffer struct. * a registered_buffer struct.
...@@ -50,6 +54,9 @@ typedef struct ...@@ -50,6 +54,9 @@ typedef struct
XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to
* backup block data in XLogRecordAssemble() */ * backup block data in XLogRecordAssemble() */
/* buffer to store a compressed version of backup block image */
char compressed_page[PGLZ_MAX_BLCKSZ];
} registered_buffer; } registered_buffer;
static registered_buffer *registered_buffers; static registered_buffer *registered_buffers;
...@@ -96,6 +103,8 @@ static MemoryContext xloginsert_cxt; ...@@ -96,6 +103,8 @@ static MemoryContext xloginsert_cxt;
static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites, XLogRecPtr RedoRecPtr, bool doPageWrites,
XLogRecPtr *fpw_lsn); XLogRecPtr *fpw_lsn);
static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
uint16 hole_length, char *dest, uint16 *dlen);
/* /*
* Begin constructing a WAL record. This must be called before the * Begin constructing a WAL record. This must be called before the
...@@ -482,7 +491,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, ...@@ -482,7 +491,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
bool needs_data; bool needs_data;
XLogRecordBlockHeader bkpb; XLogRecordBlockHeader bkpb;
XLogRecordBlockImageHeader bimg; XLogRecordBlockImageHeader bimg;
XLogRecordBlockCompressHeader cbimg;
bool samerel; bool samerel;
bool is_compressed = false;
uint16 hole_length;
uint16 hole_offset;
if (!regbuf->in_use) if (!regbuf->in_use)
continue; continue;
...@@ -529,9 +542,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, ...@@ -529,9 +542,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
if (needs_backup) if (needs_backup)
{ {
Page page = regbuf->page; Page page = regbuf->page;
uint16 compressed_len;
/* /*
* The page needs to be backed up, so set up *bimg * The page needs to be backed up, so calculate its hole length
* and offset.
*/ */
if (regbuf->flags & REGBUF_STANDARD) if (regbuf->flags & REGBUF_STANDARD)
{ {
...@@ -543,50 +558,81 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, ...@@ -543,50 +558,81 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
upper > lower && upper > lower &&
upper <= BLCKSZ) upper <= BLCKSZ)
{ {
bimg.hole_offset = lower; hole_offset = lower;
bimg.hole_length = upper - lower; hole_length = upper - lower;
} }
else else
{ {
/* No "hole" to compress out */ /* No "hole" to compress out */
bimg.hole_offset = 0; hole_offset = 0;
bimg.hole_length = 0; hole_length = 0;
} }
} }
else else
{ {
/* Not a standard page header, don't try to eliminate "hole" */ /* Not a standard page header, don't try to eliminate "hole" */
bimg.hole_offset = 0; hole_offset = 0;
bimg.hole_length = 0; hole_length = 0;
}
/*
* Try to compress a block image if wal_compression is enabled
*/
if (wal_compression)
{
is_compressed =
XLogCompressBackupBlock(page, hole_offset, hole_length,
regbuf->compressed_page,
&compressed_len);
} }
/* Fill in the remaining fields in the XLogRecordBlockHeader struct */ /* Fill in the remaining fields in the XLogRecordBlockHeader struct */
bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE; bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
total_len += BLCKSZ - bimg.hole_length;
/* /*
* Construct XLogRecData entries for the page content. * Construct XLogRecData entries for the page content.
*/ */
rdt_datas_last->next = &regbuf->bkp_rdatas[0]; rdt_datas_last->next = &regbuf->bkp_rdatas[0];
rdt_datas_last = rdt_datas_last->next; rdt_datas_last = rdt_datas_last->next;
if (bimg.hole_length == 0)
bimg.bimg_info = (hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
if (is_compressed)
{ {
rdt_datas_last->data = page; bimg.length = compressed_len;
rdt_datas_last->len = BLCKSZ; bimg.hole_offset = hole_offset;
bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
if (hole_length != 0)
cbimg.hole_length = hole_length;
rdt_datas_last->data = regbuf->compressed_page;
rdt_datas_last->len = compressed_len;
} }
else else
{ {
/* must skip the hole */ bimg.length = BLCKSZ - hole_length;
rdt_datas_last->data = page; bimg.hole_offset = hole_offset;
rdt_datas_last->len = bimg.hole_offset;
rdt_datas_last->next = &regbuf->bkp_rdatas[1]; if (hole_length == 0)
rdt_datas_last = rdt_datas_last->next; {
rdt_datas_last->data = page;
rdt_datas_last->len = BLCKSZ;
}
else
{
/* must skip the hole */
rdt_datas_last->data = page;
rdt_datas_last->len = hole_offset;
rdt_datas_last->data = page + (bimg.hole_offset + bimg.hole_length); rdt_datas_last->next = &regbuf->bkp_rdatas[1];
rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + bimg.hole_length); rdt_datas_last = rdt_datas_last->next;
rdt_datas_last->data = page + (hole_offset + hole_length);
rdt_datas_last->len = BLCKSZ - (hole_offset + hole_length);
}
} }
total_len += bimg.length;
} }
if (needs_data) if (needs_data)
...@@ -619,6 +665,12 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, ...@@ -619,6 +665,12 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
{ {
memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader); memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
scratch += SizeOfXLogRecordBlockImageHeader; scratch += SizeOfXLogRecordBlockImageHeader;
if (hole_length != 0 && is_compressed)
{
memcpy(scratch, &cbimg,
SizeOfXLogRecordBlockCompressHeader);
scratch += SizeOfXLogRecordBlockCompressHeader;
}
} }
if (!samerel) if (!samerel)
{ {
...@@ -680,6 +732,57 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, ...@@ -680,6 +732,57 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
return &hdr_rdt; return &hdr_rdt;
} }
/*
* Create a compressed version of a backup block image.
*
* Returns FALSE if compression fails (i.e., compressed result is actually
* bigger than original). Otherwise, returns TRUE and sets 'dlen' to
* the length of compressed block image.
*/
static bool
XLogCompressBackupBlock(char * page, uint16 hole_offset, uint16 hole_length,
char *dest, uint16 *dlen)
{
int32 orig_len = BLCKSZ - hole_length;
int32 len;
int32 extra_bytes = 0;
char *source;
char tmp[BLCKSZ];
if (hole_length != 0)
{
/* must skip the hole */
source = tmp;
memcpy(source, page, hole_offset);
memcpy(source + hole_offset,
page + (hole_offset + hole_length),
BLCKSZ - (hole_length + hole_offset));
/*
* Extra data needs to be stored in WAL record for the compressed
* version of block image if the hole exists.
*/
extra_bytes = SizeOfXLogRecordBlockCompressHeader;
}
else
source = page;
/*
* We recheck the actual size even if pglz_compress() reports success
* and see if the number of bytes saved by compression is larger than
* the length of extra data needed for the compressed version of block
* image.
*/
len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
if (len >= 0 &&
len + extra_bytes < orig_len)
{
*dlen = (uint16) len; /* successful compression */
return true;
}
return false;
}
/* /*
* Determine whether the buffer referenced has to be backed up. * Determine whether the buffer referenced has to be backed up.
* *
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "access/xlog_internal.h" #include "access/xlog_internal.h"
#include "access/xlogreader.h" #include "access/xlogreader.h"
#include "catalog/pg_control.h" #include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
...@@ -1037,9 +1038,78 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1037,9 +1038,78 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
if (blk->has_image) if (blk->has_image)
{ {
COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16)); COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16)); COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
datatotal += BLCKSZ - blk->hole_length; if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
{
if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
else
blk->hole_length = 0;
}
else
blk->hole_length = BLCKSZ - blk->bimg_len;
datatotal += blk->bimg_len;
/*
* cross-check that hole_offset > 0, hole_length > 0 and
* bimg_len < BLCKSZ if the HAS_HOLE flag is set.
*/
if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
(blk->hole_offset == 0 ||
blk->hole_length == 0 ||
blk->bimg_len == BLCKSZ))
{
report_invalid_record(state,
"BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X",
(unsigned int) blk->hole_offset,
(unsigned int) blk->hole_length,
(unsigned int) blk->bimg_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
}
/*
* cross-check that hole_offset == 0 and hole_length == 0
* if the HAS_HOLE flag is not set.
*/
if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
(blk->hole_offset != 0 || blk->hole_length != 0))
{
report_invalid_record(state,
"BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X",
(unsigned int) blk->hole_offset,
(unsigned int) blk->hole_length,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
}
/*
* cross-check that bimg_len < BLCKSZ
* if the IS_COMPRESSED flag is set.
*/
if ((blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
blk->bimg_len == BLCKSZ)
{
report_invalid_record(state,
"BKPIMAGE_IS_COMPRESSED set, but block image length %u at %X/%X",
(unsigned int) blk->bimg_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
}
/*
* cross-check that bimg_len = BLCKSZ if neither
* HAS_HOLE nor IS_COMPRESSED flag is set.
*/
if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
!(blk->bimg_info & BKPIMAGE_IS_COMPRESSED) &&
blk->bimg_len != BLCKSZ)
{
report_invalid_record(state,
"neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_IS_COMPRESSED set, but block image length is %u at %X/%X",
(unsigned int) blk->data_len,
(uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
goto err;
}
} }
if (!(fork_flags & BKPBLOCK_SAME_REL)) if (!(fork_flags & BKPBLOCK_SAME_REL))
{ {
...@@ -1094,7 +1164,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) ...@@ -1094,7 +1164,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
if (blk->has_image) if (blk->has_image)
{ {
blk->bkp_image = ptr; blk->bkp_image = ptr;
ptr += BLCKSZ - blk->hole_length; ptr += blk->bimg_len;
} }
if (blk->has_data) if (blk->has_data)
{ {
...@@ -1200,6 +1270,8 @@ bool ...@@ -1200,6 +1270,8 @@ bool
RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
{ {
DecodedBkpBlock *bkpb; DecodedBkpBlock *bkpb;
char *ptr;
char tmp[BLCKSZ];
if (!record->blocks[block_id].in_use) if (!record->blocks[block_id].in_use)
return false; return false;
...@@ -1207,18 +1279,35 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) ...@@ -1207,18 +1279,35 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
return false; return false;
bkpb = &record->blocks[block_id]; bkpb = &record->blocks[block_id];
ptr = bkpb->bkp_image;
if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
{
/* If a backup block image is compressed, decompress it */
if (pglz_decompress(ptr, bkpb->bimg_len, tmp,
BLCKSZ - bkpb->hole_length) < 0)
{
report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
(uint32) (record->ReadRecPtr >> 32),
(uint32) record->ReadRecPtr,
block_id);
return false;
}
ptr = tmp;
}
/* generate page, taking into account hole if necessary */
if (bkpb->hole_length == 0) if (bkpb->hole_length == 0)
{ {
memcpy(page, bkpb->bkp_image, BLCKSZ); memcpy(page, ptr, BLCKSZ);
} }
else else
{ {
memcpy(page, bkpb->bkp_image, bkpb->hole_offset); memcpy(page, ptr, bkpb->hole_offset);
/* must zero-fill the hole */ /* must zero-fill the hole */
MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length); MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
memcpy(page + (bkpb->hole_offset + bkpb->hole_length), memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
bkpb->bkp_image + bkpb->hole_offset, ptr + bkpb->hole_offset,
BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
} }
......
...@@ -996,6 +996,16 @@ static struct config_bool ConfigureNamesBool[] = ...@@ -996,6 +996,16 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL NULL, NULL, NULL
}, },
{
{"wal_compression", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Compresses full-page writes written in WAL file."),
NULL
},
&wal_compression,
false,
NULL, NULL, NULL
},
{ {
{"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT, {"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
gettext_noop("Logs each checkpoint."), gettext_noop("Logs each checkpoint."),
......
...@@ -186,6 +186,7 @@ ...@@ -186,6 +186,7 @@
# fsync_writethrough # fsync_writethrough
# open_sync # open_sync
#full_page_writes = on # recover from partial page writes #full_page_writes = on # recover from partial page writes
#wal_compression = off # enable compression of full-page writes
#wal_log_hints = off # also do full page writes of non-critical updates #wal_log_hints = off # also do full page writes of non-critical updates
# (change requires restart) # (change requires restart)
#wal_buffers = -1 # min 32kB, -1 sets based on shared_buffers #wal_buffers = -1 # min 32kB, -1 sets based on shared_buffers
......
...@@ -100,6 +100,7 @@ extern char *XLogArchiveCommand; ...@@ -100,6 +100,7 @@ extern char *XLogArchiveCommand;
extern bool EnableHotStandby; extern bool EnableHotStandby;
extern bool fullPageWrites; extern bool fullPageWrites;
extern bool wal_log_hints; extern bool wal_log_hints;
extern bool wal_compression;
extern bool log_checkpoints; extern bool log_checkpoints;
extern int CheckPointSegments; extern int CheckPointSegments;
......
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
/* /*
* Each page of XLOG file has a header like this: * Each page of XLOG file has a header like this:
*/ */
#define XLOG_PAGE_MAGIC 0xD081 /* can be used as WAL version indicator */ #define XLOG_PAGE_MAGIC 0xD082 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData typedef struct XLogPageHeaderData
{ {
......
...@@ -55,6 +55,8 @@ typedef struct ...@@ -55,6 +55,8 @@ typedef struct
char *bkp_image; char *bkp_image;
uint16 hole_offset; uint16 hole_offset;
uint16 hole_length; uint16 hole_length;
uint16 bimg_len;
uint8 bimg_info;
/* Buffer holding the rmgr-specific data associated with this block */ /* Buffer holding the rmgr-specific data associated with this block */
bool has_data; bool has_data;
......
...@@ -100,18 +100,55 @@ typedef struct XLogRecordBlockHeader ...@@ -100,18 +100,55 @@ typedef struct XLogRecordBlockHeader
* *
* As a trivial form of data compression, the XLOG code is aware that * As a trivial form of data compression, the XLOG code is aware that
* PG data pages usually contain an unused "hole" in the middle, which * PG data pages usually contain an unused "hole" in the middle, which
* contains only zero bytes. If hole_length > 0 then we have removed * contains only zero bytes. If the length of "hole" > 0 then we have removed
* such a "hole" from the stored data (and it's not counted in the * such a "hole" from the stored data (and it's not counted in the
* XLOG record's CRC, either). Hence, the amount of block data actually * XLOG record's CRC, either). Hence, the amount of block data actually
* present is BLCKSZ - hole_length bytes. * present is BLCKSZ - the length of "hole" bytes.
*
* When wal_compression is enabled, a full page image which "hole" was
* removed is additionally compressed using PGLZ compression algorithm.
* This can reduce the WAL volume, but at some extra cost of CPU spent
* on the compression during WAL logging. In this case, since the "hole"
* length cannot be calculated by subtracting the number of page image bytes
* from BLCKSZ, basically it needs to be stored as an extra information.
* But when no "hole" exists, we can assume that the "hole" length is zero
* and no such an extra information needs to be stored. Note that
* the original version of page image is stored in WAL instead of the
* compressed one if the number of bytes saved by compression is less than
* the length of extra information. Hence, when a page image is successfully
* compressed, the amount of block data actually present is less than
* BLCKSZ - the length of "hole" bytes - the length of extra information.
*/ */
typedef struct XLogRecordBlockImageHeader typedef struct XLogRecordBlockImageHeader
{ {
uint16 hole_offset; /* number of bytes before "hole" */ uint16 length; /* number of page image bytes */
uint16 hole_length; /* number of bytes in "hole" */ uint16 hole_offset; /* number of bytes before "hole" */
uint8 bimg_info; /* flag bits, see below */
/*
* If BKPIMAGE_HAS_HOLE and BKPIMAGE_IS_COMPRESSED,
* an XLogRecordBlockCompressHeader struct follows.
*/
} XLogRecordBlockImageHeader; } XLogRecordBlockImageHeader;
#define SizeOfXLogRecordBlockImageHeader sizeof(XLogRecordBlockImageHeader) #define SizeOfXLogRecordBlockImageHeader \
(offsetof(XLogRecordBlockImageHeader, bimg_info) + sizeof(uint8))
/* Information stored in bimg_info */
#define BKPIMAGE_HAS_HOLE 0x01 /* page image has "hole" */
#define BKPIMAGE_IS_COMPRESSED 0x02 /* page image is compressed */
/*
* Extra header information used when page image has "hole" and
* is compressed.
*/
typedef struct XLogRecordBlockCompressHeader
{
uint16 hole_length; /* number of bytes in "hole" */
} XLogRecordBlockCompressHeader;
#define SizeOfXLogRecordBlockCompressHeader \
sizeof(XLogRecordBlockCompressHeader)
/* /*
* Maximum size of the header for a block reference. This is used to size a * Maximum size of the header for a block reference. This is used to size a
...@@ -120,6 +157,7 @@ typedef struct XLogRecordBlockImageHeader ...@@ -120,6 +157,7 @@ typedef struct XLogRecordBlockImageHeader
#define MaxSizeOfXLogRecordBlockHeader \ #define MaxSizeOfXLogRecordBlockHeader \
(SizeOfXLogRecordBlockHeader + \ (SizeOfXLogRecordBlockHeader + \
SizeOfXLogRecordBlockImageHeader + \ SizeOfXLogRecordBlockImageHeader + \
SizeOfXLogRecordBlockCompressHeader + \
sizeof(RelFileNode) + \ sizeof(RelFileNode) + \
sizeof(BlockNumber)) sizeof(BlockNumber))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment