Commit a3115f0d authored by Heikki Linnakangas's avatar Heikki Linnakangas

Only WAL-log the modified portion in an UPDATE, if possible.

When a row is updated, and the new tuple version is put on the same page as
the old one, only WAL-log the part of the new tuple that's not identical to
the old. This saves significantly on the amount of WAL that needs to be
written, in the common case that most fields are not modified.

Amit Kapila, with a lot of back and forth with me, Robert Haas, and others.
parent 17d787a3
......@@ -6605,10 +6605,15 @@ log_heap_update(Relation reln, Buffer oldbuf,
xl_heap_header_len xlhdr;
xl_heap_header_len xlhdr_idx;
uint8 info;
uint16 prefix_suffix[2];
uint16 prefixlen = 0,
suffixlen = 0;
XLogRecPtr recptr;
XLogRecData rdata[7];
XLogRecData rdata[9];
Page page = BufferGetPage(newbuf);
bool need_tuple_data = RelationIsLogicallyLogged(reln);
int nr;
Buffer newbufref;
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
......@@ -6618,6 +6623,57 @@ log_heap_update(Relation reln, Buffer oldbuf,
else
info = XLOG_HEAP_UPDATE;
/*
* If the old and new tuple are on the same page, we only need to log
* the parts of the new tuple that were changed. That saves on the amount
* of WAL we need to write. Currently, we just count any unchanged bytes
* in the beginning and end of the tuple. That's quick to check, and
* perfectly covers the common case that only one field is updated.
*
* We could do this even if the old and new tuple are on different pages,
* but only if we don't make a full-page image of the old page, which is
* difficult to know in advance. Also, if the old tuple is corrupt for
* some reason, it would allow the corruption to propagate the new page,
* so it seems best to avoid. Under the general assumption that most
* updates tend to create the new tuple version on the same page, there
* isn't much to be gained by doing this across pages anyway.
*
* Skip this if we're taking a full-page image of the new page, as we don't
* include the new tuple in the WAL record in that case. Also disable if
* wal_level='logical', as logical decoding needs to be able to read the
* new tuple in whole from the WAL record alone.
*/
if (oldbuf == newbuf && !need_tuple_data &&
!XLogCheckBufferNeedsBackup(newbuf))
{
char *oldp = (char *) oldtup->t_data + oldtup->t_data->t_hoff;
char *newp = (char *) newtup->t_data + newtup->t_data->t_hoff;
int oldlen = oldtup->t_len - oldtup->t_data->t_hoff;
int newlen = newtup->t_len - newtup->t_data->t_hoff;
/* Check for common prefix between old and new tuple */
for (prefixlen = 0; prefixlen < Min(oldlen, newlen); prefixlen++)
{
if (newp[prefixlen] != oldp[prefixlen])
break;
}
/*
* Storing the length of the prefix takes 2 bytes, so we need to save
* at least 3 bytes or there's no point.
*/
if (prefixlen < 3)
prefixlen = 0;
/* Same for suffix */
for (suffixlen = 0; suffixlen < Min(oldlen, newlen) - prefixlen; suffixlen++)
{
if (newp[newlen - suffixlen - 1] != oldp[oldlen - suffixlen - 1])
break;
}
if (suffixlen < 3)
suffixlen = 0;
}
xlrec.target.node = reln->rd_node;
xlrec.target.tid = oldtup->t_self;
xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data);
......@@ -6630,41 +6686,119 @@ log_heap_update(Relation reln, Buffer oldbuf,
xlrec.newtid = newtup->t_self;
if (new_all_visible_cleared)
xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED;
if (prefixlen > 0)
xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD;
if (suffixlen > 0)
xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfHeapUpdate;
rdata[0].buffer = InvalidBuffer;
/* If new tuple is the single and first tuple on page... */
if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
newbufref = InvalidBuffer;
}
else
newbufref = newbuf;
rdata[0].data = NULL;
rdata[0].len = 0;
rdata[0].buffer = oldbuf;
rdata[0].buffer_std = true;
rdata[0].next = &(rdata[1]);
rdata[1].data = NULL;
rdata[1].len = 0;
rdata[1].buffer = oldbuf;
rdata[1].buffer_std = true;
rdata[1].data = (char *) &xlrec;
rdata[1].len = SizeOfHeapUpdate;
rdata[1].buffer = InvalidBuffer;
rdata[1].next = &(rdata[2]);
/* prefix and/or suffix length fields */
if (prefixlen > 0 || suffixlen > 0)
{
if (prefixlen > 0 && suffixlen > 0)
{
prefix_suffix[0] = prefixlen;
prefix_suffix[1] = suffixlen;
rdata[2].data = (char *) &prefix_suffix;
rdata[2].len = 2 * sizeof(uint16);
}
else if (prefixlen > 0)
{
rdata[2].data = (char *) &prefixlen;
rdata[2].len = sizeof(uint16);
}
else
{
rdata[2].data = (char *) &suffixlen;
rdata[2].len = sizeof(uint16);
}
rdata[2].buffer = newbufref;
rdata[2].buffer_std = true;
rdata[2].next = &(rdata[3]);
nr = 3;
}
else
nr = 2;
xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2;
xlhdr.header.t_infomask = newtup->t_data->t_infomask;
xlhdr.header.t_hoff = newtup->t_data->t_hoff;
xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
Assert(offsetof(HeapTupleHeaderData, t_bits) + prefixlen + suffixlen <= newtup->t_len);
xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) - prefixlen - suffixlen;
/*
* As with insert records, we need not store the rdata[2] segment
* if we decide to store the whole buffer instead unless we're
* doing logical decoding.
* As with insert records, we need not store this rdata segment if we
* decide to store the whole buffer instead, unless we're doing logical
* decoding.
*/
rdata[2].data = (char *) &xlhdr;
rdata[2].len = SizeOfHeapHeaderLen;
rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf;
rdata[2].buffer_std = true;
rdata[2].next = &(rdata[3]);
rdata[nr].data = (char *) &xlhdr;
rdata[nr].len = SizeOfHeapHeaderLen;
rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
rdata[nr].buffer_std = true;
rdata[nr].next = &(rdata[nr + 1]);
nr++;
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[3].data = (char *) newtup->t_data
+ offsetof(HeapTupleHeaderData, t_bits);
rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf;
rdata[3].buffer_std = true;
rdata[3].next = NULL;
/*
* PG73FORMAT: write bitmap [+ padding] [+ oid] + data
*
* The 'data' doesn't include the common prefix or suffix.
*/
if (prefixlen == 0)
{
rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
rdata[nr].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) - suffixlen;
rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
rdata[nr].buffer_std = true;
rdata[nr].next = NULL;
nr++;
}
else
{
/*
* Have to write the null bitmap and data after the common prefix as
* two separate rdata entries.
*/
/* bitmap [+ padding] [+ oid] */
if (newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits) > 0)
{
rdata[nr - 1].next = &(rdata[nr]);
rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
rdata[nr].len = newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
rdata[nr].buffer_std = true;
rdata[nr].next = NULL;
nr++;
}
/* data after common prefix */
rdata[nr - 1].next = &(rdata[nr]);
rdata[nr].data = ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen;
rdata[nr].len = newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen;
rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref;
rdata[nr].buffer_std = true;
rdata[nr].next = NULL;
nr++;
}
/*
* Separate storage for the FPW buffer reference of the new page in the
......@@ -6672,13 +6806,15 @@ log_heap_update(Relation reln, Buffer oldbuf,
*/
if (need_tuple_data)
{
rdata[3].next = &(rdata[4]);
rdata[nr - 1].next = &(rdata[nr]);
rdata[nr].data = NULL,
rdata[nr].len = 0;
rdata[nr].buffer = newbufref;
rdata[nr].buffer_std = true;
rdata[nr].next = NULL;
nr++;
rdata[4].data = NULL,
rdata[4].len = 0;
rdata[4].buffer = newbuf;
rdata[4].buffer_std = true;
rdata[4].next = NULL;
xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE;
/* We need to log a tuple identity */
......@@ -6690,19 +6826,21 @@ log_heap_update(Relation reln, Buffer oldbuf,
xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff;
xlhdr_idx.t_len = old_key_tuple->t_len;
rdata[4].next = &(rdata[5]);
rdata[5].data = (char *) &xlhdr_idx;
rdata[5].len = SizeOfHeapHeaderLen;
rdata[5].buffer = InvalidBuffer;
rdata[5].next = &(rdata[6]);
rdata[nr - 1].next = &(rdata[nr]);
rdata[nr].data = (char *) &xlhdr_idx;
rdata[nr].len = SizeOfHeapHeaderLen;
rdata[nr].buffer = InvalidBuffer;
rdata[nr].next = &(rdata[nr + 1]);
nr++;
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[6].data = (char *) old_key_tuple->t_data
rdata[nr].data = (char *) old_key_tuple->t_data
+ offsetof(HeapTupleHeaderData, t_bits);
rdata[6].len = old_key_tuple->t_len
rdata[nr].len = old_key_tuple->t_len
- offsetof(HeapTupleHeaderData, t_bits);
rdata[6].buffer = InvalidBuffer;
rdata[6].next = NULL;
rdata[nr].buffer = InvalidBuffer;
rdata[nr].next = NULL;
nr++;
if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE;
......@@ -6711,19 +6849,6 @@ log_heap_update(Relation reln, Buffer oldbuf,
}
}
/* If new tuple is the single and first tuple on page... */
if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
XLogRecData *rcur = &rdata[2];
info |= XLOG_HEAP_INIT_PAGE;
while (rcur != NULL)
{
rcur->buffer = InvalidBuffer;
rcur = rcur->next;
}
}
recptr = XLogInsert(RM_HEAP_ID, info, rdata);
return recptr;
......@@ -7750,17 +7875,25 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
Page page;
OffsetNumber offnum;
ItemId lp = NULL;
HeapTupleData oldtup;
HeapTupleHeader htup;
char *recdata;
uint16 prefixlen = 0,
suffixlen = 0;
char *newp;
struct
{
HeapTupleHeaderData hdr;
char data[MaxHeapTupleSize];
} tbuf;
xl_heap_header_len xlhdr;
int hsize;
uint32 newlen;
Size freespace;
/* initialize to keep the compiler quiet */
oldtup.t_data = NULL;
oldtup.t_len = 0;
/*
* The visibility map may need to be fixed even if the heap page is
* already up-to-date.
......@@ -7827,6 +7960,9 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
htup = (HeapTupleHeader) PageGetItem(page, lp);
oldtup.t_data = htup;
oldtup.t_len = ItemIdGetLength(lp);
htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
if (hot_update)
......@@ -7925,20 +8061,63 @@ newsame:;
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "heap_update_redo: invalid max offset number");
hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen;
recdata = (char *) xlrec + SizeOfHeapUpdate;
memcpy((char *) &xlhdr,
(char *) xlrec + SizeOfHeapUpdate,
SizeOfHeapHeaderLen);
newlen = xlhdr.t_len;
Assert(newlen <= MaxHeapTupleSize);
if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD)
{
memcpy(&prefixlen, recdata, sizeof(uint16));
recdata += sizeof(uint16);
}
if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD)
{
memcpy(&suffixlen, recdata, sizeof(uint16));
recdata += sizeof(uint16);
}
memcpy((char *) &xlhdr, recdata, SizeOfHeapHeaderLen);
recdata += SizeOfHeapHeaderLen;
Assert(xlhdr.t_len + prefixlen + suffixlen <= MaxHeapTupleSize);
htup = &tbuf.hdr;
MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
(char *) xlrec + hsize,
newlen);
newlen += offsetof(HeapTupleHeaderData, t_bits);
/*
* Reconstruct the new tuple using the prefix and/or suffix from the old
* tuple, and the data stored in the WAL record.
*/
newp = (char *) htup + offsetof(HeapTupleHeaderData, t_bits);
if (prefixlen > 0)
{
int len;
/* copy bitmap [+ padding] [+ oid] from WAL record */
len = xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits);
memcpy(newp, recdata, len);
recdata += len;
newp += len;
/* copy prefix from old tuple */
memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
newp += prefixlen;
/* copy new tuple data from WAL record */
len = xlhdr.t_len - (xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits));
memcpy(newp, recdata, len);
recdata += len;
newp += len;
}
else
{
/* copy bitmap [+ padding] [+ oid] + data from record, all in one go */
memcpy(newp, recdata, xlhdr.t_len);
recdata += xlhdr.t_len;
newp += xlhdr.t_len;
}
/* copy suffix from old tuple */
if (suffixlen > 0)
memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
newlen = offsetof(HeapTupleHeaderData, t_bits) + xlhdr.t_len + prefixlen + suffixlen;
htup->t_infomask2 = xlhdr.header.t_infomask2;
htup->t_infomask = xlhdr.header.t_infomask;
htup->t_hoff = xlhdr.header.t_hoff;
......
......@@ -2335,6 +2335,29 @@ XLogRecPtrToBytePos(XLogRecPtr ptr)
return result;
}
/*
* Determine whether the buffer referenced has to be backed up.
*
* Since we don't yet have the insert lock, fullPageWrites and forcePageWrites
* could change later, so the result should be used for optimization purposes
* only.
*/
bool
XLogCheckBufferNeedsBackup(Buffer buffer)
{
bool doPageWrites;
Page page;
page = BufferGetPage(buffer);
doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites;
if (doPageWrites && PageGetLSN(page) <= RedoRecPtr)
return true; /* buffer requires backup */
return false; /* buffer does not need to be backed up */
}
/*
* Determine whether the buffer referenced by an XLogRecData item has to
* be backed up, and if so fill a BkpBlock struct for it. In any case
......
......@@ -67,6 +67,8 @@
#define XLOG_HEAP_CONTAINS_OLD_TUPLE (1<<2)
#define XLOG_HEAP_CONTAINS_OLD_KEY (1<<3)
#define XLOG_HEAP_CONTAINS_NEW_TUPLE (1<<4)
#define XLOG_HEAP_PREFIX_FROM_OLD (1<<5)
#define XLOG_HEAP_SUFFIX_FROM_OLD (1<<6)
/* convenience macro for checking whether any form of old tuple was logged */
#define XLOG_HEAP_CONTAINS_OLD \
......@@ -179,7 +181,22 @@ typedef struct xl_heap_update
ItemPointerData newtid; /* new inserted tuple id */
uint8 old_infobits_set; /* infomask bits to set on old tuple */
uint8 flags;
/* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */
/*
* If XLOG_HEAP_PREFIX_FROM_OLD or XLOG_HEAP_SUFFIX_FROM_OLD flags are
* set, the prefix and/or suffix come next, as one or two uint16s.
*
* After that, xl_heap_header_len and new tuple data follow. The new
* tuple data and length don't include the prefix and suffix, which are
* copied from the old tuple on replay. The new tuple data is omitted if
* a full-page image of the page was taken (unless the
* XLOG_HEAP_CONTAINS_NEW_TUPLE flag is set, in which case it's included
* anyway).
*
* If XLOG_HEAP_CONTAINS_OLD_TUPLE or XLOG_HEAP_CONTAINS_OLD_KEY flags are
* set, another xl_heap_header_len struct and tuple data for the old tuple
* follows.
*/
} xl_heap_update;
#define SizeOfHeapUpdate (offsetof(xl_heap_update, flags) + sizeof(uint8))
......
......@@ -279,6 +279,7 @@ typedef struct CheckpointStatsData
extern CheckpointStatsData CheckpointStats;
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
extern bool XLogCheckBufferNeedsBackup(Buffer buffer);
extern void XLogFlush(XLogRecPtr RecPtr);
extern bool XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment