Commit 68689c66 authored by Tom Lane's avatar Tom Lane

Micro-optimize GenericXLogFinish().

Make the inner comparison loops of computeDelta() as tight as possible by
pulling considerations of valid and invalid ranges out of the inner loops,
and extending a match or non-match detection as far as possible before
deciding what to do next.  To keep this tractable, give up the possibility
of merging fragments across the pd_lower to pd_upper gap.  The fraction of
pages where that could happen (ie, there are 4 or fewer bytes in the gap,
*and* data changes immediately adjacent to it on both sides) is too small
to be worth spending cycles on.

Also, avoid two BLCKSZ-length memcpy()s by computing the delta before
moving data into the target buffer, instead of after.  This doesn't save
nearly as many cycles as being tenser about computeDelta(), but it still
seems worth doing.

On my machine, this patch cuts a full 40% off the runtime of
contrib/bloom's regression test.
parent c7a141a9
......@@ -34,14 +34,17 @@
* the delta. For this reason, we merge adjacent fragments if the unchanged
* region between them is <= MATCH_THRESHOLD bytes.
*
* The worst case for delta sizes occurs when we did not find any unchanged
* region in the page. The size of the delta will be the size of the page plus
* the size of the fragment header in that case.
* We do not bother to merge fragments across the "lower" and "upper" parts
* of a page; it's very seldom the case that pd_lower and pd_upper are within
* MATCH_THRESHOLD bytes of each other, and handling that infrequent case
* would complicate and slow down the delta-computation code unduly.
* Therefore, the worst-case delta size includes two fragment headers plus
* a full page's worth of data.
*-------------------------------------------------------------------------
*/
#define FRAGMENT_HEADER_SIZE (2 * sizeof(OffsetNumber))
#define MATCH_THRESHOLD FRAGMENT_HEADER_SIZE
#define MAX_DELTA_SIZE (BLCKSZ + FRAGMENT_HEADER_SIZE)
#define MAX_DELTA_SIZE (BLCKSZ + 2 * FRAGMENT_HEADER_SIZE)
/* Struct of generic xlog data for single page */
typedef struct
......@@ -62,7 +65,11 @@ struct GenericXLogState
static void writeFragment(PageData *pageData, OffsetNumber offset,
OffsetNumber len, const char *data);
static void computeDelta(PageData *pageData);
static void computeRegionDelta(PageData *pageData,
const char *curpage, const char *targetpage,
int targetStart, int targetEnd,
int validStart, int validEnd);
static void computeDelta(PageData *pageData, Page curpage, Page targetpage);
static void applyPageRedo(Page page, const char *delta, Size deltaSize);
......@@ -94,102 +101,155 @@ writeFragment(PageData *pageData, OffsetNumber offset, OffsetNumber length,
}
/*
* Compute the delta record for given page.
* Compute the XLOG fragments needed to transform a region of curpage into the
* corresponding region of targetpage, and append them to pageData's delta
* field. The region to transform runs from targetStart to targetEnd-1.
* Bytes in curpage outside the range validStart to validEnd-1 should be
* considered invalid, and always overwritten with target data.
*
* This function is a hot spot, so it's worth being as tense as possible
* about the data-matching loops.
*/
static void
computeDelta(PageData *pageData)
computeRegionDelta(PageData *pageData,
const char *curpage, const char *targetpage,
int targetStart, int targetEnd,
int validStart, int validEnd)
{
Page page = BufferGetPage(pageData->buffer, NULL, NULL,
BGP_NO_SNAPSHOT_TEST),
image = (Page) pageData->image;
int i,
loopEnd,
fragmentBegin = -1,
fragmentEnd = -1;
uint16 pageLower = ((PageHeader) page)->pd_lower,
pageUpper = ((PageHeader) page)->pd_upper,
imageLower = ((PageHeader) image)->pd_lower,
imageUpper = ((PageHeader) image)->pd_upper;
pageData->deltaLen = 0;
for (i = 0; i < BLCKSZ; i++)
/* Deal with any invalid start region by including it in first fragment */
if (validStart > targetStart)
{
bool match;
/*
* Check if bytes in old and new page images match. We do not care
* about data in the unallocated area between pd_lower and pd_upper.
* We assume the unallocated area to expand with unmatched bytes.
* Bytes inside the unallocated area are assumed to always match.
*/
if (i < pageLower)
{
if (i < imageLower)
match = (page[i] == image[i]);
else
match = false;
}
else if (i >= pageUpper)
{
if (i >= imageUpper)
match = (page[i] == image[i]);
else
match = false;
}
else
{
match = true;
}
fragmentBegin = targetStart;
targetStart = validStart;
}
if (match)
{
if (fragmentBegin >= 0)
{
/* Matched byte is potentially part of a fragment. */
if (fragmentEnd < 0)
fragmentEnd = i;
/* We'll deal with any invalid end region after the main loop */
loopEnd = Min(targetEnd, validEnd);
/*
* Write next fragment if sequence of matched bytes is longer
* than MATCH_THRESHOLD.
*/
if (i - fragmentEnd >= MATCH_THRESHOLD)
{
writeFragment(pageData, fragmentBegin,
fragmentEnd - fragmentBegin,
page + fragmentBegin);
fragmentBegin = -1;
fragmentEnd = -1;
}
}
}
else
/* Examine all the potentially matchable bytes */
i = targetStart;
while (i < loopEnd)
{
if (curpage[i] != targetpage[i])
{
/* On unmatched byte, start new fragment if it is not done yet */
/* On unmatched byte, start new fragment if not already in one */
if (fragmentBegin < 0)
fragmentBegin = i;
/* Mark unmatched-data endpoint as uncertain */
fragmentEnd = -1;
/* Extend the fragment as far as possible in a tight loop */
i++;
while (i < loopEnd && curpage[i] != targetpage[i])
i++;
if (i >= loopEnd)
break;
}
/* Found a matched byte, so remember end of unmatched fragment */
fragmentEnd = i;
/*
* Extend the match as far as possible in a tight loop. (On typical
* workloads, this inner loop is the bulk of this function's runtime.)
*/
i++;
while (i < loopEnd && curpage[i] == targetpage[i])
i++;
/*
* There are several possible cases at this point:
*
* 1. We have no unwritten fragment (fragmentBegin < 0). There's
* nothing to write; and it doesn't matter what fragmentEnd is.
*
* 2. We found more than MATCH_THRESHOLD consecutive matching bytes.
* Dump out the unwritten fragment, stopping at fragmentEnd.
*
* 3. The match extends to loopEnd. We'll do nothing here, exit the
* loop, and then dump the unwritten fragment, after merging it with
* the invalid end region if any. If we don't so merge, fragmentEnd
* establishes how much the final writeFragment call needs to write.
*
* 4. We found an unmatched byte before loopEnd. The loop will repeat
* and will enter the unmatched-byte stanza above. So in this case
* also, it doesn't matter what fragmentEnd is. The matched bytes
* will get merged into the continuing unmatched fragment.
*
* Only in case 3 do we reach the bottom of the loop with a meaningful
* fragmentEnd value, which is why it's OK that we unconditionally
* assign "fragmentEnd = i" above.
*/
if (fragmentBegin >= 0 && i - fragmentEnd > MATCH_THRESHOLD)
{
writeFragment(pageData, fragmentBegin,
fragmentEnd - fragmentBegin,
targetpage + fragmentBegin);
fragmentBegin = -1;
fragmentEnd = -1; /* not really necessary */
}
}
/* Deal with any invalid end region by including it in final fragment */
if (loopEnd < targetEnd)
{
if (fragmentBegin < 0)
fragmentBegin = loopEnd;
fragmentEnd = targetEnd;
}
/* Write final fragment if any */
if (fragmentBegin >= 0)
{
if (fragmentEnd < 0)
fragmentEnd = targetEnd;
writeFragment(pageData, fragmentBegin,
BLCKSZ - fragmentBegin,
page + fragmentBegin);
fragmentEnd - fragmentBegin,
targetpage + fragmentBegin);
}
}
/*
* Compute the XLOG delta record needed to transform curpage into targetpage,
* and store it in pageData's delta field.
*/
static void
computeDelta(PageData *pageData, Page curpage, Page targetpage)
{
int targetLower = ((PageHeader) targetpage)->pd_lower,
targetUpper = ((PageHeader) targetpage)->pd_upper,
curLower = ((PageHeader) curpage)->pd_lower,
curUpper = ((PageHeader) curpage)->pd_upper;
pageData->deltaLen = 0;
/* Compute delta records for lower part of page ... */
computeRegionDelta(pageData, curpage, targetpage,
0, targetLower,
0, curLower);
/* ... and for upper part, ignoring what's between */
computeRegionDelta(pageData, curpage, targetpage,
targetUpper, BLCKSZ,
curUpper, BLCKSZ);
/*
* If xlog debug is enabled, then check produced delta. Result of delta
* application to saved image should be the same as current page state.
* application to curpage should be equivalent to targetpage.
*/
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
{
char tmp[BLCKSZ];
memcpy(tmp, image, BLCKSZ);
memcpy(tmp, curpage, BLCKSZ);
applyPageRedo(tmp, pageData->delta, pageData->deltaLen);
if (memcmp(tmp, page, pageLower) != 0 ||
memcmp(tmp + pageUpper, page + pageUpper, BLCKSZ - pageUpper) != 0)
if (memcmp(tmp, targetpage, targetLower) != 0 ||
memcmp(tmp + targetUpper, targetpage + targetUpper,
BLCKSZ - targetUpper) != 0)
elog(ERROR, "result of generic xlog apply does not match");
}
#endif
......@@ -264,7 +324,7 @@ GenericXLogRegister(GenericXLogState *state, Buffer buffer, bool isNew)
XLogRecPtr
GenericXLogFinish(GenericXLogState *state)
{
XLogRecPtr lsn = InvalidXLogRecPtr;
XLogRecPtr lsn;
int i;
if (state->isLogged)
......@@ -278,7 +338,6 @@ GenericXLogFinish(GenericXLogState *state)
{
PageData *pageData = &state->pages[i];
Page page;
char tmp[BLCKSZ];
if (BufferIsInvalid(pageData->buffer))
continue;
......@@ -286,14 +345,10 @@ GenericXLogFinish(GenericXLogState *state)
page = BufferGetPage(pageData->buffer, NULL, NULL,
BGP_NO_SNAPSHOT_TEST);
/* Swap current and saved page image. */
memcpy(tmp, pageData->image, BLCKSZ);
memcpy(pageData->image, page, BLCKSZ);
memcpy(page, tmp, BLCKSZ);
if (pageData->fullImage)
{
/* A full page image does not require anything special */
memcpy(page, pageData->image, BLCKSZ);
XLogRegisterBuffer(i, pageData->buffer, REGBUF_FORCE_IMAGE);
}
else
......@@ -302,8 +357,9 @@ GenericXLogFinish(GenericXLogState *state)
* In normal mode, calculate delta and write it as xlog data
* associated with this page.
*/
computeDelta(pageData, page, (Page) pageData->image);
memcpy(page, pageData->image, BLCKSZ);
XLogRegisterBuffer(i, pageData->buffer, REGBUF_STANDARD);
computeDelta(pageData);
XLogRegisterBufData(i, pageData->delta, pageData->deltaLen);
}
}
......@@ -341,6 +397,8 @@ GenericXLogFinish(GenericXLogState *state)
MarkBufferDirty(pageData->buffer);
}
END_CRIT_SECTION();
/* We don't have a LSN to return, in this case */
lsn = InvalidXLogRecPtr;
}
pfree(state);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment